Skip to content
Snippets Groups Projects
Commit 1e391b45 authored by julian.baeume's avatar julian.baeume :pick:
Browse files

Change: Reconnect common redis client if redis is not reachable

since quite some time we do not report the pod as unhealthy any longer
if redis connection is gone. So in case the connection is lost, we want it
to reconnect.

Make sure to always fetch from redis once client (re-)connected.
parent e69cd19c
No related branches found
No related tags found
No related merge requests found
......@@ -39,8 +39,10 @@ describe('File caching service', function () {
'/index.html': () => new Response('<html><head></head><body>it\'s me</body></html>', { headers: { 'content-type': 'text/html' } })
}
})
await import('../src/redis.js').then(({ client, createClient }) => {
await import('../src/redis.js').then(({ switchClient, createClient }) => {
pubClient = createClient()
const client = createClient('common client')
switchClient(client)
return client.flushdb()
})
await import('../src/version.js').then(async ({ updateVersionProcessor }) => {
......
......@@ -46,8 +46,10 @@ describe('Configuration', function () {
)
}
})
await import('../src/redis.js').then(({ client, createClient }) => {
await import('../src/redis.js').then(({ switchClient, createClient }) => {
pubClient = createClient()
const client = createClient('common client')
switchClient(client)
return client.flushdb()
})
await import('../src/version.js').then(async ({ updateVersionProcessor }) => {
......
......@@ -50,8 +50,10 @@ describe('Updates the version', function () {
)
}
})
await import('../src/redis.js').then(({ client, createClient }) => {
await import('../src/redis.js').then(({ switchClient, createClient }) => {
pubClient = createClient('pubClient')
const client = createClient('common client')
switchClient(client)
return client.flushdb()
})
await import('../src/version.js').then(async ({ updateVersionProcessor }) => {
......@@ -121,7 +123,9 @@ describe('Updates the version', function () {
}
})
const { client } = await import('../src/redis.js')
const { switchClient, createClient } = await import('../src/redis.js')
const client = createClient('common client')
switchClient(client)
await client.flushdb()
// preconfigure redis
await Promise.all([
......
......@@ -31,15 +31,24 @@ import logger from './logger.js'
import fastify from 'fastify'
import autoLoad from '@fastify/autoload'
import { getLatestVersion, registerLatestVersionListener } from './version.js'
import { getLatestVersion, registerLatestVersionListener, versionInfo } from './version.js'
import { configMap } from './config_map.js'
import { warmCache } from './files.js'
import { createClient, isReady, client } from './redis.js'
import { createClient, isReady, switchClient } from './redis.js'
import lightship from './lightship.js'
const __filename = fileURLToPath(import.meta.url)
const __dirname = dirname(__filename)
const subClient = createClient('sub client')
const client = createClient('common client')
switchClient(client)
client.on('connect', async () => {
logger.debug('[Redis] Common client connected')
// forget local version and re-fetch from redis next time
versionInfo.version = null
await waitForVersionAvailable()
})
// Load env vars from .env and .env.defaults files
// Note: actual env vars supersede .env file and .env file supersedes .env.defaults file
......@@ -54,7 +63,7 @@ async function waitForVersionAvailable () {
}
}
lightship.queueBlockingTask(isReady())
lightship.queueBlockingTask(isReady(client))
lightship.queueBlockingTask(configMap.load())
lightship.queueBlockingTask(waitForVersionAvailable())
......
......@@ -55,19 +55,29 @@ export function createClient (id, options = commonQueueOptions) {
: new Redis(options)
client.on('ready', () => logger.info(`[Redis] Connected ${id} to redis on ${process.env.REDIS_HOSTS}`))
client.on('error', (err) => logger.error(`[Redis] Connect error: ${err}`))
client.on('error', (err) => logger.error(`[Redis client (${id})] Connect error: ${err}`))
return client
}
export async function isReady () {
export async function isReady (client) {
return new Promise(resolve => {
client.on('ready', () => resolve(true))
client.on('error', () => resolve(false))
}).catch(() => false)
}
export const client = createClient('common client', { maxRetriesPerRequest: 1 })
let clientInstance = null
export function switchClient (client) {
clientInstance = client
}
export const client = new Proxy({}, {
get (target, prop) {
if (clientInstance) return clientInstance[prop]
throw new Error('Redis client not initialized')
}
})
export function isEnabled () {
return !!process.env.REDIS_HOST
......
......@@ -21,18 +21,26 @@
*/
import { config } from 'dotenv-defaults'
import { createClient, client, isReady } from './redis.js'
import { createClient, isReady, switchClient } from './redis.js'
import logger from './logger.js'
import fastify from 'fastify'
import autoLoad from '@fastify/autoload'
import { fileURLToPath } from 'node:url'
import { join, dirname } from 'node:path'
import lightship from './lightship.js'
import { updateVersionProcessor } from './version.js'
import { updateVersionProcessor, versionInfo } from './version.js'
const __filename = fileURLToPath(import.meta.url)
const __dirname = dirname(__filename)
const pubClient = createClient('pub client')
const client = createClient('cache writer', { maxRetriesPerRequest: 1 })
switchClient(client)
client.on('connect', async () => {
logger.debug('[Redis] cache writer connected, forcing cache update')
// forget local version and make sure to run warmCache next time
versionInfo.version = 'unknown'
})
config()
......@@ -45,7 +53,7 @@ async function runUpdate () {
}
runUpdate()
lightship.queueBlockingTask(isReady())
lightship.queueBlockingTask(isReady(client))
const app = fastify({})
app.register(autoLoad, { dir: join(__dirname, 'plugins') })
......
......@@ -186,10 +186,10 @@ export async function updateVersionProcessor (pubClient) {
const stringifiedVersionInfo = JSON.stringify(versionInfo)
cache.clear()
await warmCache({ version: versionInfo.version, fetchFiles: true })
await warmCache({ version: fetchedVersionInfo.version, fetchFiles: true })
await redis.client.set(getRedisKey({ name: 'versionInfo' }), stringifiedVersionInfo)
await cache.get(getRedisKey({ version: versionInfo.version, name: 'mergedMetadata' }), async () => [await fetchMergedMetadata()])
versionUpdateGauge.setToCurrentTime({ version: versionInfo.version })
await cache.get(getRedisKey({ version: fetchedVersionInfo.version, name: 'mergedMetadata' }), async () => [await fetchMergedMetadata()])
versionUpdateGauge.setToCurrentTime({ version: fetchVersionInfo.version })
logger.info('[Version] publish update to other nodes.')
pubClient.publish(getRedisKey({ name: 'updateVersionInfo' }), stringifiedVersionInfo)
} else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment