Skip to content
Snippets Groups Projects
redis.js 3.03 KiB
Newer Older
import Redis from 'ioredis'
import Queue from 'bull'
import { registerLatestVersionListener, updateVersionProcessor } from './version.js'

const commonQueueOptions = { enableReadyCheck: false, maxRetriesPerRequest: null }

const hosts = (process.env.REDIS_HOSTS || '').split(',').map(host => {
  const [hostname, port] = host.split(':')
  return { host: hostname, port: Number(port) }
})

function createClient (id, options = {}) {
  options = {
    password: process.env.REDIS_PASSWORD,
    ...options
  }

  if (process.env.REDIS_MODE === 'sentinel') {
    options = {
      sentinels: hosts,
      name: process.env.REDIS_SENTINEL_MASTER_ID,
      ...options
    }
  } else if (process.env.REDIS_MODE === 'standalone') {
    options = {
      ...hosts[0],
      ...options
    }
  }
  const client = process.env.REDIS_MODE === 'cluster'
    ? new Redis.Cluster(hosts, { redisOptions: options })
    : new Redis(options)

  client.on('ready', () => logger.info(`[Redis] Connected ${id} to redis on ${process.env.REDIS_HOSTS}`))
  client.on('error', (err) => logger.error(`[Redis] Connect error: ${err}`))

  return client
}

export async function isReady () {
  return new Promise(resolve => {
    client.on('ready', () => resolve(true))
    client.on('error', () => resolve(false))
  }).catch(() => false)
export const client = createClient('common client', { maxRetriesPerRequest: 1 })
export const pubClient = createClient('pub client')
export const subClient = createClient('sub client', commonQueueOptions)

/*
 * Bull specific things are below
 */
client.on('ready', () => {
  const updateVersionQueue = getQueue('update-version')
  try {
    updateVersionQueue.process(updateVersionProcessor)
    logger.debug('[Redis] Register version update processor.')
  } catch (err) {
    logger.debug('[Redis] Update version processor is already registered.')
  }
  updateVersionQueue.add({}, {
    jobId: 'update-version-job',
    repeat: { every: Number(process.env.CACHE_TTL) },
    removeOnComplete: 10,
    removeOnFail: 10,
    timeout: 10000
registerLatestVersionListener(subClient)
const queues = {}

export function getQueue (name) {
  if (queues[name]) return queues[name]
  return (queues[name] = new Queue(name, {
    prefix: process.env.REDIS_PREFIX,
    createClient: function (type) {
      switch (type) {
        case 'client':
        case 'subscriber':
          return subClient.duplicate()
        default:
          return client.duplicate(commonQueueOptions)
      }
    }
  }))
}

export function getQueues () {
  return Object.values(queues)
}

export async function closeQueue (name) {
  if (!queues[name]) throw new Error(`No such queue "${name}" to close.`)
  const queue = queues[name]
  delete queues[name]
  const jobs = await queue.getRepeatableJobs()
  for (const job of jobs) {
    queue.removeRepeatableByKey(job.key)
  }
  return queue.close()
}

export function isEnabled () {
  return !!process.env.REDIS_HOST
}