diff --git a/integration/caching_test.js b/integration/caching_test.js index 67056859629a37645723e925f5c3c0bb94a0a3ab..effaa060f023e83ca16f6319e94e79e91a2c6938 100644 --- a/integration/caching_test.js +++ b/integration/caching_test.js @@ -1,7 +1,6 @@ import request from 'supertest' import { expect } from 'chai' import { generateSimpleViteManifest, mockApp, mockConfig, mockFetch } from '../spec/util.js' -import { client } from '../src/redis.js' import * as td from 'testdouble' import { getRedisKey } from '../src/util.js' @@ -9,7 +8,6 @@ describe('File caching service', function () { let app beforeEach(async function () { - await client.flushdb() mockConfig({ urls: ['http://ui-server/'] }) mockFetch({ 'http://ui-server': { @@ -19,6 +17,7 @@ describe('File caching service', function () { '/index.html': () => new Response('<html><head></head><body>it\'s me</body></html>', { headers: { 'content-type': 'text/html' } }) } }) + await import('../src/redis.js').then(({ client }) => client.flushdb()) app = await mockApp() }) @@ -31,6 +30,7 @@ describe('File caching service', function () { expect(response.statusCode).to.equal(200) const version = response.headers.version + const { client } = await import('../src/redis.js') expect(await client.get(getRedisKey({ version, name: 'viteManifests' }))).to.equal('{"index.html":{"file":"index.html","meta":{"baseUrl":"http://ui-server/"}}}') expect(await client.get(getRedisKey({ version, name: 'oxManifests' }))).to.equal('[]') }) @@ -40,6 +40,7 @@ describe('File caching service', function () { expect(response.statusCode).to.equal(200) const version = response.headers.version + const { client } = await import('../src/redis.js') const body = (await client.getBuffer(getRedisKey({ version, name: '/index.html:body' }))) || '' expect(body.toString()).to.equal('<html><head></head><body>it\'s me</body></html>') const meta = await client.get(getRedisKey({ version, name: '/index.html:meta' })) @@ -48,6 +49,7 @@ describe('File caching service', function () { it('serves files from redis and stores them in local cache', async function () { const version = '12345' + const { client } = await import('../src/redis.js') await client.set(getRedisKey({ version, name: '/demo.js:meta' }), '{"headers":{"content-type":"application/javascript","dependencies":false}}') await client.set(getRedisKey({ version, name: '/demo.js:body' }), 'console.log("Demo")') diff --git a/integration/config_test.js b/integration/config_test.js index de0cbdfd60d9d9d59431d58579916085b8089b99..cde423c099998a9f1028fdc8e7f709abb7840c45 100644 --- a/integration/config_test.js +++ b/integration/config_test.js @@ -1,7 +1,6 @@ import request from 'supertest' import { expect } from 'chai' import { generateSimpleViteManifest, mockApp, mockConfig, mockFetch } from '../spec/util.js' -import { client, closeQueue, getQueues, pubClient } from '../src/redis.js' import * as td from 'testdouble' import { getRedisKey } from '../src/util.js' @@ -12,7 +11,6 @@ describe('Configuration', function () { beforeEach(async function () { // need to set the redis-prefix. Otherwise, the bull workers will interfere process.env.REDIS_PREFIX = Math.random().toString() - await client.flushdb() mockConfig(config = { urls: ['http://ui-server/'] }) mockFetch({ 'http://ui-server': { @@ -26,26 +24,26 @@ describe('Configuration', function () { ) } }) + await import('../src/redis.js').then(({ client }) => client.flushdb()) app = await mockApp() }) afterEach(async function () { - td.reset() + const { getQueues, closeQueue } = await import('../src/redis.js') for (const queue of getQueues()) { await closeQueue(queue.name) } + td.reset() // reset, after the queues were removed process.env.REDIS_PREFIX = 'ui-middleware' }) it('updates the configuration when updated on a different node', async function () { - // need to do this with dynamic import such that the mocked config is used - await import('../src/create-queues.js').then(({ default: createQueues }) => createQueues()) - const response = await request(app.server).get('/meta') expect(response.body).to.have.length(2) config.urls = [] + const { pubClient } = await import('../src/redis.js') pubClient.publish(getRedisKey({ name: 'updateLatestVersion' }), '1234') await new Promise(resolve => setTimeout(resolve, 200)) diff --git a/integration/update-version_test.js b/integration/update-version_test.js index cbd09e1dc0e7544ad3b5c8e870fef4aeaec12a5f..af2244cb5c5c0a55702438073d488c58e5e17ad4 100644 --- a/integration/update-version_test.js +++ b/integration/update-version_test.js @@ -1,7 +1,6 @@ import request from 'supertest' import { expect } from 'chai' import { generateSimpleViteManifest, mockApp, mockConfig, mockFetch } from '../spec/util.js' -import { client, closeQueue, getQueue, getQueues, pubClient } from '../src/redis.js' import * as td from 'testdouble' import { getRedisKey } from '../src/util.js' @@ -11,7 +10,6 @@ describe('Updates the version', function () { beforeEach(async function () { // need to set the redis-prefix. Otherwise, the bull workers will interfere process.env.REDIS_PREFIX = Math.random().toString() - await client.flushdb() mockConfig({ urls: ['http://ui-server/'] }) mockFetch({ 'http://ui-server': { @@ -26,27 +24,26 @@ describe('Updates the version', function () { ) } }) + await import('../src/redis.js').then(({ client }) => client.flushdb()) app = await mockApp() }) afterEach(async function () { - td.reset() - process.env.CACHE_TTL = '30000' + const { getQueues, closeQueue } = await import('../src/redis.js') for (const queue of getQueues()) { await closeQueue(queue.name) } + td.reset() // reset, after the queues were removed process.env.REDIS_PREFIX = 'ui-middleware' }) it('with manually triggered job', async function () { - // need to do this with dynamic import such that the mocked config is used - await import('../src/create-queues.js').then(({ default: createQueues }) => createQueues()) - const responseBeforeUpdate = await request(app.server).get('/index.html') expect(responseBeforeUpdate.statusCode).to.equal(200) expect(responseBeforeUpdate.headers.version).to.equal('85101541') + const { getQueue } = await import('../src/redis.js') // update has only been registered but not executed yet expect(await getQueue('update-version').add({}).then(job => job.finished())).to.equal('85101541') // update is executed with the second iteration @@ -58,24 +55,27 @@ describe('Updates the version', function () { }) it('with automatically triggered job', async function () { - process.env.CACHE_TTL = '100' - const responseBeforeUpdate = await request(app.server).get('/index.html') expect(responseBeforeUpdate.statusCode).to.equal(200) expect(responseBeforeUpdate.headers.version).to.equal('85101541') - // need to do this with dynamic import such that the mocked config is used - await import('../src/create-queues.js').then(({ default: createQueues }) => createQueues()) - + // speed up the update process + const { subClient, getQueue } = await import('../src/redis.js') const queue = getQueue('update-version') - let count = 0 - await new Promise(resolve => queue.on('global:completed', (jobId, result) => { - // only resolve when the second job has been completed as the "update" job needs to be executed twice - if (++count === 1) return - // pause the queue to prevent any further updates - queue.pause() - resolve() - })) + queue.add({}, { + jobId: 'update-version-job', + repeat: { every: 100 } + }) + + // wait for the update event to happen + await new Promise(resolve => { + const key = getRedisKey({ name: 'updateLatestVersion' }) + subClient.subscribe(key) + subClient.on('message', async (channel, version) => { + if (channel !== key) return + resolve() + }) + }) const responseAfterUpdate = await request(app.server).get('/index.html') expect(responseAfterUpdate.statusCode).to.equal(200) @@ -83,13 +83,11 @@ describe('Updates the version', function () { }) it('receives version update via redis event', async function () { - // need to do this with dynamic import such that the mocked config is used - await import('../src/create-queues.js').then(({ default: createQueues }) => createQueues()) - const responseBeforeUpdate = await request(app.server).get('/index.html') expect(responseBeforeUpdate.statusCode).to.equal(200) expect(responseBeforeUpdate.headers.version).to.equal('85101541') + const { pubClient } = await import('../src/redis.js') // just publish event, don't change the value on redis. pubClient.publish(getRedisKey({ name: 'updateLatestVersion' }), '1234') await new Promise(resolve => setTimeout(resolve, 10)) @@ -104,9 +102,6 @@ describe('Updates the version', function () { td.reset() // need to set the redis-prefix. Otherwise, the bull workers will interfere process.env.REDIS_PREFIX = Math.random().toString() - await client.flushdb() - // preconfigure redis - await client.set(getRedisKey({ name: 'latestVersion' }), '12345') mockConfig({ urls: ['http://ui-server/'] }) mockFetch({ 'http://ui-server': { @@ -120,6 +115,11 @@ describe('Updates the version', function () { ) } }) + + const { client } = await import('../src/redis.js') + await client.flushdb() + // preconfigure redis + await client.set(getRedisKey({ name: 'latestVersion' }), '12345') app = await mockApp() }) diff --git a/spec/redis_test.js b/spec/redis_test.js index b1d2e46f4784b3f7775e884a27c779a2e7487a44..d8c30c2d88b7ea77ae15fe11414548962b9522f1 100644 --- a/spec/redis_test.js +++ b/spec/redis_test.js @@ -11,8 +11,6 @@ describe('Redis', function () { let spy beforeEach(async function () { - // no redis mock!! - await import('../src/create-queues.js').then(({ default: createQueues }) => createQueues()) mockConfig({ urls: ['http://ui-server/'] }) mockFetch({ 'http://ui-server': { diff --git a/src/create-queues.js b/src/create-queues.js deleted file mode 100644 index d7fd31384032ccf94dbfb457a4c061e0dbb397ef..0000000000000000000000000000000000000000 --- a/src/create-queues.js +++ /dev/null @@ -1,23 +0,0 @@ -import * as redis from './redis.js' -import { updateVersionProcessor, registerLatestVersionListener } from './version.js' - -const { getQueue, subClient } = redis - -export default function createQueues () { - if (redis.isEnabled()) { - const updateVersionQueue = getQueue('update-version') - updateVersionQueue.process(updateVersionProcessor) - updateVersionQueue.add({}, { - jobId: 'update-version-job', - repeat: { every: Number(process.env.CACHE_TTL) }, - removeOnComplete: 10, - removeOnFail: 10, - timeout: 10000 - }) - - // not a queue but though, used by redis - registerLatestVersionListener(subClient) - } else { - setInterval(updateVersionProcessor, Number(process.env.CACHE_TTL)) - } -} diff --git a/src/index.js b/src/index.js index 992a191719bf34ed0e3b4de07470f7ed6326c461..1c1caa5310aa022539877f14939b15003c793c98 100644 --- a/src/index.js +++ b/src/index.js @@ -3,11 +3,9 @@ import { config } from 'dotenv-defaults' import { logger } from './logger.js' import { createApp } from './create-app.js' -import createQueues from './create-queues.js' config() const app = await createApp() -createQueues() // Binds and listens for connections on the specified host and port app.listen({ host: '::', port: Number(process.env.PORT) }) diff --git a/src/redis.js b/src/redis.js index a26432a97be863c7e37444cd035b0c578842b69c..4184691302bbdf0a314d1e70f0e28a1e35d9fc97 100644 --- a/src/redis.js +++ b/src/redis.js @@ -1,6 +1,7 @@ import Redis from 'ioredis' import { logger, timeSinceStartup } from './logger.js' import Queue from 'bull' +import { registerLatestVersionListener, updateVersionProcessor } from './version.js' const commonQueueOptions = { enableReadyCheck: false, maxRetriesPerRequest: null } @@ -11,10 +12,12 @@ const createClient = (type, options = {}) => { get () {}, set () {}, del () {}, - flushdb () {}, + flushdb () { return {} }, status: '', duplicate () { return new Redis() }, - publish () {} + publish () {}, + subscribe () {}, + on () {} }, { get () { throw new Error('Redis is disabled. Check for redis.isEnabled()') @@ -48,6 +51,34 @@ export const subClient = createClient('sub client', commonQueueOptions) /* * Bull specific things are below */ + +if (isEnabled()) { + client.on('ready', () => { + const updateVersionQueue = getQueue('update-version') + try { + updateVersionQueue.process(updateVersionProcessor) + logger.debug('[Redis] Register version update processor.') + } catch (err) { + logger.debug('[Redis] Update version processor is already registered.') + } + updateVersionQueue.add({}, { + jobId: 'update-version-job', + repeat: { every: Number(process.env.CACHE_TTL) }, + removeOnComplete: 10, + removeOnFail: 10, + timeout: 10000 + }) + }) + + registerLatestVersionListener(subClient) +} else { + setInterval(updateVersionProcessor, Number(process.env.CACHE_TTL)) +} + +/* + * queue specific code + */ + const queues = {} export function getQueue (name) {