-
richard.petersen authored
Root cause: Circular dependency causes undefined function Solution: As the registration of the listener can be async, just use a simple setTimeout
richard.petersen authoredRoot cause: Circular dependency causes undefined function Solution: As the registration of the listener can be async, just use a simple setTimeout
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
redis.js 3.20 KiB
import Redis from 'ioredis'
import { logger, timeSinceStartup } from './logger.js'
import Queue from 'bull'
import { registerLatestVersionListener, updateVersionProcessor } from './version.js'
const commonQueueOptions = { enableReadyCheck: false, maxRetriesPerRequest: null }
const createClient = (type, options = {}) => {
if (!isEnabled()) {
return new Proxy({
getBuffer () {},
get () {},
set () {},
del () {},
flushdb () { return {} },
status: '',
duplicate () { return new Redis() },
publish () {},
subscribe () {},
on () {},
quit () { }
}, {
get () {
throw new Error('Redis is disabled. Check for redis.isEnabled()')
}
})
}
const client = new Redis({
host: process.env.REDIS_HOST,
port: Number(process.env.REDIS_PORT),
db: Number(process.env.REDIS_DB),
password: process.env.REDIS_PASSWORD,
...options
})
client.on('ready', () => logger.info(`[Redis] Connected ${type} to redis on ${process.env.REDIS_HOST}. Time since startup: ${timeSinceStartup()}s`))
client.on('error', (err) => logger.error(`[Redis] Connect error: ${err}`))
return client
}
export async function isReady () {
return new Promise(resolve => {
if (!isEnabled()) return resolve(true)
client.on('ready', () => resolve(true))
client.on('error', () => resolve(false))
}).catch(() => false)
}
export const client = createClient('common client')
export const pubClient = createClient('pub client')
export const subClient = createClient('sub client', commonQueueOptions)
/*
* Bull specific things are below
*/
if (isEnabled()) {
client.on('ready', () => {
const updateVersionQueue = getQueue('update-version')
try {
updateVersionQueue.process(updateVersionProcessor)
logger.debug('[Redis] Register version update processor.')
} catch (err) {
logger.debug('[Redis] Update version processor is already registered.')
}
updateVersionQueue.add({}, {
jobId: 'update-version-job',
repeat: { every: Number(process.env.CACHE_TTL) },
removeOnComplete: 10,
removeOnFail: 10,
timeout: 10000
})
})
registerLatestVersionListener(subClient)
} else {
setTimeout(() => {
setInterval(updateVersionProcessor, Number(process.env.CACHE_TTL))
})
}
/*
* queue specific code
*/
const queues = {}
export function getQueue (name) {
if (queues[name]) return queues[name]
// @ts-ignore
return (queues[name] = new Queue(name, {
prefix: process.env.REDIS_PREFIX,
createClient: function (type) {
switch (type) {
case 'client':
return client.duplicate()
case 'subscriber':
return subClient.duplicate()
default:
return client.duplicate(commonQueueOptions)
}
}
}))
}
export function getQueues () {
return Object.values(queues)
}
export async function closeQueue (name) {
if (!queues[name]) throw new Error(`No such queue "${name}" to close.`)
const queue = queues[name]
delete queues[name]
const jobs = await queue.getRepeatableJobs()
for (const job of jobs) {
queue.removeRepeatableByKey(job.key)
}
return queue.close()
}
export function isEnabled () {
return !!process.env.REDIS_HOST
}