import Redis from 'ioredis' import { logger, timeSinceStartup } from './logger.js' import Queue from 'bull' const commonQueueOptions = { enableReadyCheck: false, maxRetriesPerRequest: null } const createClient = (type, options = {}) => { if (!isEnabled()) { return new Proxy({}, { get () { throw new Error('Redis is disabled. Check for redis.isEnabled()') } }) } const client = new Redis({ host: process.env.REDIS_HOST, port: process.env.REDIS_PORT || 6379, db: process.env.REDIS_DB, password: process.env.REDIS_PASSWORD, ...options }) client.on('ready', () => logger.info(`[Redis] Connected ${type} to redis on ${process.env.REDIS_HOST}. Time since startup: ${timeSinceStartup()}s`)) client.on('error', (err) => logger.error(`[Redis] Connect error: ${err}`)) return client } export async function isReady () { if (client.status !== 'ready') throw new Error() if (pubClient.status !== 'ready') throw new Error() if (subClient.status !== 'ready') throw new Error() } export const client = createClient('common client') export const pubClient = createClient('pub client') export const subClient = createClient('sub client', commonQueueOptions) /* * Bull specific things are below */ const queues = {} export function getQueue (name) { if (queues[name]) return queues[name] return (queues[name] = new Queue(name, { prefix: process.env.REDIS_PREFIX, createClient: function (type) { switch (type) { case 'client': return client case 'subscriber': return subClient.duplicate() default: return client.duplicate(commonQueueOptions) } } })) } export function getQueues () { return Object.values(queues) } export async function closeQueue (name) { if (!queues[name]) throw new Error(`No such queue "${name}" to close.`) const queue = queues[name] delete queues[name] const jobs = await queue.getRepeatableJobs() for (const job of jobs) { queue.removeRepeatableByKey(job.key) } return queue.close() } export function isEnabled () { return !!process.env.REDIS_HOST }