/* * * @copyright Copyright (c) OX Software GmbH, Germany <info@open-xchange.com> * @license AGPL-3.0 * * This code is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with OX App Suite. If not, see <https://www.gnu.org/licenses/agpl-3.0.txt>. * * Any use of the work other than as authorized under this license or copyright law is prohibited. * */ import Redis from 'ioredis' import logger from './logger.js' import Queue from 'bull' import { registerLatestVersionListener, updateVersionProcessor } from './version.js' const commonQueueOptions = { enableReadyCheck: false, maxRetriesPerRequest: null } const hosts = (process.env.REDIS_HOSTS || '').split(',').map(host => { const [hostname, port] = host.split(':') return { host: hostname, port: Number(port) } }) function createClient (id, options = {}) { options = { db: Number(process.env.REDIS_DB), password: process.env.REDIS_PASSWORD, ...options } if (process.env.REDIS_MODE === 'sentinel') { options = { sentinels: hosts, name: process.env.REDIS_SENTINEL_MASTER_ID, ...options } } else if (process.env.REDIS_MODE === 'standalone') { options = { ...hosts[0], ...options } } const client = process.env.REDIS_MODE === 'cluster' ? new Redis.Cluster(hosts, { redisOptions: options }) : new Redis(options) client.on('ready', () => logger.info(`[Redis] Connected ${id} to redis on ${process.env.REDIS_HOSTS}`)) client.on('error', (err) => logger.error(`[Redis] Connect error: ${err}`)) return client } export async function isReady () { return new Promise(resolve => { client.on('ready', () => resolve(true)) client.on('error', () => resolve(false)) }).catch(() => false) } export const client = createClient('common client', { maxRetriesPerRequest: 1 }) export const pubClient = createClient('pub client') export const subClient = createClient('sub client', commonQueueOptions) /* * Bull specific things are below */ client.on('ready', () => { const updateVersionQueue = getQueue('update-version') try { updateVersionQueue.process(updateVersionProcessor) logger.debug('[Redis] Register version update processor.') } catch (err) { logger.debug('[Redis] Update version processor is already registered.') } updateVersionQueue.add({}, { jobId: 'update-version-job', repeat: { every: Number(process.env.CACHE_TTL) }, removeOnComplete: 10, removeOnFail: 10, timeout: 10000 }) }) registerLatestVersionListener(subClient) /* * queue specific code */ const queues = {} export function getQueue (name) { if (queues[name]) return queues[name] return (queues[name] = new Queue(name, { prefix: process.env.REDIS_PREFIX, createClient: function (type) { switch (type) { case 'client': return client.duplicate() case 'subscriber': return subClient.duplicate() default: return client.duplicate(commonQueueOptions) } } })) } export function getQueues () { return Object.values(queues) } export async function closeQueue (name) { if (!queues[name]) throw new Error(`No such queue "${name}" to close.`) const queue = queues[name] delete queues[name] const jobs = await queue.getRepeatableJobs() for (const job of jobs) { queue.removeRepeatableByKey(job.key) } return queue.close() } export function isEnabled () { return !!process.env.REDIS_HOST }