From 1e391b4582e15ab8ea386babfbf89222210b0df5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20B=C3=A4ume?= <julian.baeume@open-xchange.com> Date: Thu, 12 Oct 2023 17:13:30 +0200 Subject: [PATCH] Change: Reconnect common redis client if redis is not reachable since quite some time we do not report the pod as unhealthy any longer if redis connection is gone. So in case the connection is lost, we want it to reconnect. Make sure to always fetch from redis once client (re-)connected. --- integration/caching_test.js | 4 +++- integration/config_test.js | 4 +++- integration/update-version_test.js | 8 ++++++-- src/index.js | 15 ++++++++++++--- src/redis.js | 16 +++++++++++++--- src/updater.js | 14 +++++++++++--- src/version.js | 6 +++--- 7 files changed, 51 insertions(+), 16 deletions(-) diff --git a/integration/caching_test.js b/integration/caching_test.js index 2ee477b..d92df15 100644 --- a/integration/caching_test.js +++ b/integration/caching_test.js @@ -39,8 +39,10 @@ describe('File caching service', function () { '/index.html': () => new Response('<html><head></head><body>it\'s me</body></html>', { headers: { 'content-type': 'text/html' } }) } }) - await import('../src/redis.js').then(({ client, createClient }) => { + await import('../src/redis.js').then(({ switchClient, createClient }) => { pubClient = createClient() + const client = createClient('common client') + switchClient(client) return client.flushdb() }) await import('../src/version.js').then(async ({ updateVersionProcessor }) => { diff --git a/integration/config_test.js b/integration/config_test.js index 66fb7dc..e8f60fb 100644 --- a/integration/config_test.js +++ b/integration/config_test.js @@ -46,8 +46,10 @@ describe('Configuration', function () { ) } }) - await import('../src/redis.js').then(({ client, createClient }) => { + await import('../src/redis.js').then(({ switchClient, createClient }) => { pubClient = createClient() + const client = createClient('common client') + switchClient(client) return client.flushdb() }) await import('../src/version.js').then(async ({ updateVersionProcessor }) => { diff --git a/integration/update-version_test.js b/integration/update-version_test.js index 949df58..457c3be 100644 --- a/integration/update-version_test.js +++ b/integration/update-version_test.js @@ -50,8 +50,10 @@ describe('Updates the version', function () { ) } }) - await import('../src/redis.js').then(({ client, createClient }) => { + await import('../src/redis.js').then(({ switchClient, createClient }) => { pubClient = createClient('pubClient') + const client = createClient('common client') + switchClient(client) return client.flushdb() }) await import('../src/version.js').then(async ({ updateVersionProcessor }) => { @@ -121,7 +123,9 @@ describe('Updates the version', function () { } }) - const { client } = await import('../src/redis.js') + const { switchClient, createClient } = await import('../src/redis.js') + const client = createClient('common client') + switchClient(client) await client.flushdb() // preconfigure redis await Promise.all([ diff --git a/src/index.js b/src/index.js index be93348..ab4b7b6 100644 --- a/src/index.js +++ b/src/index.js @@ -31,15 +31,24 @@ import logger from './logger.js' import fastify from 'fastify' import autoLoad from '@fastify/autoload' -import { getLatestVersion, registerLatestVersionListener } from './version.js' +import { getLatestVersion, registerLatestVersionListener, versionInfo } from './version.js' import { configMap } from './config_map.js' import { warmCache } from './files.js' -import { createClient, isReady, client } from './redis.js' +import { createClient, isReady, switchClient } from './redis.js' import lightship from './lightship.js' const __filename = fileURLToPath(import.meta.url) const __dirname = dirname(__filename) const subClient = createClient('sub client') +const client = createClient('common client') +switchClient(client) + +client.on('connect', async () => { + logger.debug('[Redis] Common client connected') + // forget local version and re-fetch from redis next time + versionInfo.version = null + await waitForVersionAvailable() +}) // Load env vars from .env and .env.defaults files // Note: actual env vars supersede .env file and .env file supersedes .env.defaults file @@ -54,7 +63,7 @@ async function waitForVersionAvailable () { } } -lightship.queueBlockingTask(isReady()) +lightship.queueBlockingTask(isReady(client)) lightship.queueBlockingTask(configMap.load()) lightship.queueBlockingTask(waitForVersionAvailable()) diff --git a/src/redis.js b/src/redis.js index 1899d91..db05f29 100644 --- a/src/redis.js +++ b/src/redis.js @@ -55,19 +55,29 @@ export function createClient (id, options = commonQueueOptions) { : new Redis(options) client.on('ready', () => logger.info(`[Redis] Connected ${id} to redis on ${process.env.REDIS_HOSTS}`)) - client.on('error', (err) => logger.error(`[Redis] Connect error: ${err}`)) + client.on('error', (err) => logger.error(`[Redis client (${id})] Connect error: ${err}`)) return client } -export async function isReady () { +export async function isReady (client) { return new Promise(resolve => { client.on('ready', () => resolve(true)) client.on('error', () => resolve(false)) }).catch(() => false) } -export const client = createClient('common client', { maxRetriesPerRequest: 1 }) +let clientInstance = null +export function switchClient (client) { + clientInstance = client +} + +export const client = new Proxy({}, { + get (target, prop) { + if (clientInstance) return clientInstance[prop] + throw new Error('Redis client not initialized') + } +}) export function isEnabled () { return !!process.env.REDIS_HOST diff --git a/src/updater.js b/src/updater.js index 0361019..fc5c64d 100644 --- a/src/updater.js +++ b/src/updater.js @@ -21,18 +21,26 @@ */ import { config } from 'dotenv-defaults' -import { createClient, client, isReady } from './redis.js' +import { createClient, isReady, switchClient } from './redis.js' import logger from './logger.js' import fastify from 'fastify' import autoLoad from '@fastify/autoload' import { fileURLToPath } from 'node:url' import { join, dirname } from 'node:path' import lightship from './lightship.js' -import { updateVersionProcessor } from './version.js' +import { updateVersionProcessor, versionInfo } from './version.js' const __filename = fileURLToPath(import.meta.url) const __dirname = dirname(__filename) const pubClient = createClient('pub client') +const client = createClient('cache writer', { maxRetriesPerRequest: 1 }) +switchClient(client) + +client.on('connect', async () => { + logger.debug('[Redis] cache writer connected, forcing cache update') + // forget local version and make sure to run warmCache next time + versionInfo.version = 'unknown' +}) config() @@ -45,7 +53,7 @@ async function runUpdate () { } runUpdate() -lightship.queueBlockingTask(isReady()) +lightship.queueBlockingTask(isReady(client)) const app = fastify({}) app.register(autoLoad, { dir: join(__dirname, 'plugins') }) diff --git a/src/version.js b/src/version.js index 85da053..d9005fb 100644 --- a/src/version.js +++ b/src/version.js @@ -186,10 +186,10 @@ export async function updateVersionProcessor (pubClient) { const stringifiedVersionInfo = JSON.stringify(versionInfo) cache.clear() - await warmCache({ version: versionInfo.version, fetchFiles: true }) + await warmCache({ version: fetchedVersionInfo.version, fetchFiles: true }) await redis.client.set(getRedisKey({ name: 'versionInfo' }), stringifiedVersionInfo) - await cache.get(getRedisKey({ version: versionInfo.version, name: 'mergedMetadata' }), async () => [await fetchMergedMetadata()]) - versionUpdateGauge.setToCurrentTime({ version: versionInfo.version }) + await cache.get(getRedisKey({ version: fetchedVersionInfo.version, name: 'mergedMetadata' }), async () => [await fetchMergedMetadata()]) + versionUpdateGauge.setToCurrentTime({ version: fetchVersionInfo.version }) logger.info('[Version] publish update to other nodes.') pubClient.publish(getRedisKey({ name: 'updateVersionInfo' }), stringifiedVersionInfo) } else { -- GitLab