Newer
Older
import logger from './logger.js'

richard.petersen
committed
import { registerLatestVersionListener, updateVersionProcessor } from './version.js'
const commonQueueOptions = { enableReadyCheck: false, maxRetriesPerRequest: null }
const hosts = (process.env.REDIS_HOSTS || '').split(',').map(host => {
const [hostname, port] = host.split(':')
return { host: hostname, port: Number(port) }
})
function createClient (id, options = {}) {
options = {
db: Number(process.env.REDIS_DB),
password: process.env.REDIS_PASSWORD,
...options
}
if (process.env.REDIS_MODE === 'sentinel') {
options = {
sentinels: hosts,
name: process.env.REDIS_SENTINEL_MASTER_ID,
...options
}
} else if (process.env.REDIS_MODE === 'standalone') {
options = {
...hosts[0],
...options
}
}
const client = process.env.REDIS_MODE === 'cluster'
? new Redis.Cluster(hosts, { redisOptions: options })
: new Redis(options)
client.on('ready', () => logger.info(`[Redis] Connected ${id} to redis on ${process.env.REDIS_HOSTS}`))
client.on('error', (err) => logger.error(`[Redis] Connect error: ${err}`))
return client
}
export async function isReady () {
return new Promise(resolve => {
client.on('ready', () => resolve(true))
client.on('error', () => resolve(false))
}).catch(() => false)
export const client = createClient('common client', { maxRetriesPerRequest: 1 })
export const pubClient = createClient('pub client')
export const subClient = createClient('sub client', commonQueueOptions)
/*
* Bull specific things are below
*/

richard.petersen
committed
client.on('ready', () => {
const updateVersionQueue = getQueue('update-version')
try {
updateVersionQueue.process(updateVersionProcessor)
logger.debug('[Redis] Register version update processor.')
} catch (err) {
logger.debug('[Redis] Update version processor is already registered.')
}
updateVersionQueue.add({}, {
jobId: 'update-version-job',
repeat: { every: Number(process.env.CACHE_TTL) },
removeOnComplete: 10,
removeOnFail: 10,
timeout: 10000

richard.petersen
committed
})

richard.petersen
committed
registerLatestVersionListener(subClient)

richard.petersen
committed
/*
* queue specific code
*/
const queues = {}
export function getQueue (name) {
if (queues[name]) return queues[name]
return (queues[name] = new Queue(name, {
prefix: process.env.REDIS_PREFIX,
createClient: function (type) {
switch (type) {
case 'client':
return client.duplicate()
case 'subscriber':
return subClient.duplicate()
default:
return client.duplicate(commonQueueOptions)
}
}
}))
}
export function getQueues () {
return Object.values(queues)
}
export async function closeQueue (name) {
if (!queues[name]) throw new Error(`No such queue "${name}" to close.`)
const queue = queues[name]
delete queues[name]
const jobs = await queue.getRepeatableJobs()
for (const job of jobs) {
queue.removeRepeatableByKey(job.key)
}
return queue.close()
}
export function isEnabled () {
return !!process.env.REDIS_HOST
}