diff --git a/integration/caching_test.js b/integration/caching_test.js index 2ee477b2fadd7f826398852d080b19a5c9b303a7..d92df150d5ed226418bb954a3b6fdbf4f588c543 100644 --- a/integration/caching_test.js +++ b/integration/caching_test.js @@ -39,8 +39,10 @@ describe('File caching service', function () { '/index.html': () => new Response('<html><head></head><body>it\'s me</body></html>', { headers: { 'content-type': 'text/html' } }) } }) - await import('../src/redis.js').then(({ client, createClient }) => { + await import('../src/redis.js').then(({ switchClient, createClient }) => { pubClient = createClient() + const client = createClient('common client') + switchClient(client) return client.flushdb() }) await import('../src/version.js').then(async ({ updateVersionProcessor }) => { diff --git a/integration/config_test.js b/integration/config_test.js index 66fb7dc5d38c266120a9f68ecbe33e0a384d9ba9..e8f60fb83350cf85d65216d23eb9eac2a011baaf 100644 --- a/integration/config_test.js +++ b/integration/config_test.js @@ -46,8 +46,10 @@ describe('Configuration', function () { ) } }) - await import('../src/redis.js').then(({ client, createClient }) => { + await import('../src/redis.js').then(({ switchClient, createClient }) => { pubClient = createClient() + const client = createClient('common client') + switchClient(client) return client.flushdb() }) await import('../src/version.js').then(async ({ updateVersionProcessor }) => { diff --git a/integration/update-version_test.js b/integration/update-version_test.js index 949df58717ac414bae0288686bf486b7c94a46f5..457c3be02656d6633ab9a98867878a001a1232b0 100644 --- a/integration/update-version_test.js +++ b/integration/update-version_test.js @@ -50,8 +50,10 @@ describe('Updates the version', function () { ) } }) - await import('../src/redis.js').then(({ client, createClient }) => { + await import('../src/redis.js').then(({ switchClient, createClient }) => { pubClient = createClient('pubClient') + const client = createClient('common client') + switchClient(client) return client.flushdb() }) await import('../src/version.js').then(async ({ updateVersionProcessor }) => { @@ -121,7 +123,9 @@ describe('Updates the version', function () { } }) - const { client } = await import('../src/redis.js') + const { switchClient, createClient } = await import('../src/redis.js') + const client = createClient('common client') + switchClient(client) await client.flushdb() // preconfigure redis await Promise.all([ diff --git a/src/index.js b/src/index.js index be93348effc5e86852dae55a21b75577aa57225e..ab4b7b6571468349de70114044eca3af3a98f55f 100644 --- a/src/index.js +++ b/src/index.js @@ -31,15 +31,24 @@ import logger from './logger.js' import fastify from 'fastify' import autoLoad from '@fastify/autoload' -import { getLatestVersion, registerLatestVersionListener } from './version.js' +import { getLatestVersion, registerLatestVersionListener, versionInfo } from './version.js' import { configMap } from './config_map.js' import { warmCache } from './files.js' -import { createClient, isReady, client } from './redis.js' +import { createClient, isReady, switchClient } from './redis.js' import lightship from './lightship.js' const __filename = fileURLToPath(import.meta.url) const __dirname = dirname(__filename) const subClient = createClient('sub client') +const client = createClient('common client') +switchClient(client) + +client.on('connect', async () => { + logger.debug('[Redis] Common client connected') + // forget local version and re-fetch from redis next time + versionInfo.version = null + await waitForVersionAvailable() +}) // Load env vars from .env and .env.defaults files // Note: actual env vars supersede .env file and .env file supersedes .env.defaults file @@ -54,7 +63,7 @@ async function waitForVersionAvailable () { } } -lightship.queueBlockingTask(isReady()) +lightship.queueBlockingTask(isReady(client)) lightship.queueBlockingTask(configMap.load()) lightship.queueBlockingTask(waitForVersionAvailable()) diff --git a/src/redis.js b/src/redis.js index 1899d91bc4227883658a81e08bac118ca8b4b6fb..db05f29b179c30eb340c2b8ca5fd0d314e065e1b 100644 --- a/src/redis.js +++ b/src/redis.js @@ -55,19 +55,29 @@ export function createClient (id, options = commonQueueOptions) { : new Redis(options) client.on('ready', () => logger.info(`[Redis] Connected ${id} to redis on ${process.env.REDIS_HOSTS}`)) - client.on('error', (err) => logger.error(`[Redis] Connect error: ${err}`)) + client.on('error', (err) => logger.error(`[Redis client (${id})] Connect error: ${err}`)) return client } -export async function isReady () { +export async function isReady (client) { return new Promise(resolve => { client.on('ready', () => resolve(true)) client.on('error', () => resolve(false)) }).catch(() => false) } -export const client = createClient('common client', { maxRetriesPerRequest: 1 }) +let clientInstance = null +export function switchClient (client) { + clientInstance = client +} + +export const client = new Proxy({}, { + get (target, prop) { + if (clientInstance) return clientInstance[prop] + throw new Error('Redis client not initialized') + } +}) export function isEnabled () { return !!process.env.REDIS_HOST diff --git a/src/updater.js b/src/updater.js index 036101910b64f66cff94b036375618591eca25b4..fc5c64d4e9874e041edc6e17b08c065e935bb4cb 100644 --- a/src/updater.js +++ b/src/updater.js @@ -21,18 +21,26 @@ */ import { config } from 'dotenv-defaults' -import { createClient, client, isReady } from './redis.js' +import { createClient, isReady, switchClient } from './redis.js' import logger from './logger.js' import fastify from 'fastify' import autoLoad from '@fastify/autoload' import { fileURLToPath } from 'node:url' import { join, dirname } from 'node:path' import lightship from './lightship.js' -import { updateVersionProcessor } from './version.js' +import { updateVersionProcessor, versionInfo } from './version.js' const __filename = fileURLToPath(import.meta.url) const __dirname = dirname(__filename) const pubClient = createClient('pub client') +const client = createClient('cache writer', { maxRetriesPerRequest: 1 }) +switchClient(client) + +client.on('connect', async () => { + logger.debug('[Redis] cache writer connected, forcing cache update') + // forget local version and make sure to run warmCache next time + versionInfo.version = 'unknown' +}) config() @@ -45,7 +53,7 @@ async function runUpdate () { } runUpdate() -lightship.queueBlockingTask(isReady()) +lightship.queueBlockingTask(isReady(client)) const app = fastify({}) app.register(autoLoad, { dir: join(__dirname, 'plugins') }) diff --git a/src/version.js b/src/version.js index 85da0530bf7ba08081cc397abad2ed8f4836a056..d9005fb37cfcd8bf3d6d7f31694e71edd6fc68df 100644 --- a/src/version.js +++ b/src/version.js @@ -186,10 +186,10 @@ export async function updateVersionProcessor (pubClient) { const stringifiedVersionInfo = JSON.stringify(versionInfo) cache.clear() - await warmCache({ version: versionInfo.version, fetchFiles: true }) + await warmCache({ version: fetchedVersionInfo.version, fetchFiles: true }) await redis.client.set(getRedisKey({ name: 'versionInfo' }), stringifiedVersionInfo) - await cache.get(getRedisKey({ version: versionInfo.version, name: 'mergedMetadata' }), async () => [await fetchMergedMetadata()]) - versionUpdateGauge.setToCurrentTime({ version: versionInfo.version }) + await cache.get(getRedisKey({ version: fetchedVersionInfo.version, name: 'mergedMetadata' }), async () => [await fetchMergedMetadata()]) + versionUpdateGauge.setToCurrentTime({ version: fetchVersionInfo.version }) logger.info('[Version] publish update to other nodes.') pubClient.publish(getRedisKey({ name: 'updateVersionInfo' }), stringifiedVersionInfo) } else {