import Redis from 'ioredis' import { logger, timeSinceStartup } from './logger.js' import Queue from 'bull' import { registerLatestVersionListener, updateVersionProcessor } from './version.js' const commonQueueOptions = { enableReadyCheck: false, maxRetriesPerRequest: null } const createClient = (type, options = {}) => { if (!isEnabled()) { return new Proxy({ getBuffer () {}, get () {}, set () {}, del () {}, flushdb () { return {} }, status: '', duplicate () { return new Redis() }, publish () {}, subscribe () {}, on () {}, quit () { } }, { get () { throw new Error('Redis is disabled. Check for redis.isEnabled()') } }) } const client = new Redis({ host: process.env.REDIS_HOST, port: Number(process.env.REDIS_PORT), db: Number(process.env.REDIS_DB), password: process.env.REDIS_PASSWORD, ...options }) client.on('ready', () => logger.info(`[Redis] Connected ${type} to redis on ${process.env.REDIS_HOST}. Time since startup: ${timeSinceStartup()}s`)) client.on('error', (err) => logger.error(`[Redis] Connect error: ${err}`)) return client } export async function isReady () { return new Promise(resolve => { if (!isEnabled()) return resolve(true) client.on('ready', () => resolve(true)) client.on('error', () => resolve(false)) }).catch(() => false) } export const client = createClient('common client') export const pubClient = createClient('pub client') export const subClient = createClient('sub client', commonQueueOptions) /* * Bull specific things are below */ if (isEnabled()) { client.on('ready', () => { const updateVersionQueue = getQueue('update-version') try { updateVersionQueue.process(updateVersionProcessor) logger.debug('[Redis] Register version update processor.') } catch (err) { logger.debug('[Redis] Update version processor is already registered.') } updateVersionQueue.add({}, { jobId: 'update-version-job', repeat: { every: Number(process.env.CACHE_TTL) }, removeOnComplete: 10, removeOnFail: 10, timeout: 10000 }) }) registerLatestVersionListener(subClient) } else { setInterval(updateVersionProcessor, Number(process.env.CACHE_TTL)) } /* * queue specific code */ const queues = {} export function getQueue (name) { if (queues[name]) return queues[name] // @ts-ignore return (queues[name] = new Queue(name, { prefix: process.env.REDIS_PREFIX, createClient: function (type) { switch (type) { case 'client': return client.duplicate() case 'subscriber': return subClient.duplicate() default: return client.duplicate(commonQueueOptions) } } })) } export function getQueues () { return Object.values(queues) } export async function closeQueue (name) { if (!queues[name]) throw new Error(`No such queue "${name}" to close.`) const queue = queues[name] delete queues[name] const jobs = await queue.getRepeatableJobs() for (const job of jobs) { queue.removeRepeatableByKey(job.key) } return queue.close() } export function isEnabled () { return !!process.env.REDIS_HOST }