diff --git a/streaming/index.js b/streaming/index.js index 8b7477a44..74cbf4c2c 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -63,20 +63,29 @@ const dbUrlToConfig = (dbUrl) => { * @param {Object.} defaultConfig * @param {string} redisUrl */ -const redisUrlToClient = (defaultConfig, redisUrl) => { +const redisUrlToClient = async (defaultConfig, redisUrl) => { const config = defaultConfig; + let client; + if (!redisUrl) { - return redis.createClient(config); + client = redis.createClient(config); + } else if (redisUrl.startsWith('unix://')) { + client = redis.createClient(Object.assign(config, { + socket: { + path: redisUrl.slice(7), + }, + })); + } else { + client = redis.createClient(Object.assign(config, { + url: redisUrl, + })); } - if (redisUrl.startsWith('unix://')) { - return redis.createClient(redisUrl.slice(7), config); - } + client.on('error', (err) => log.error('Redis Client Error!', err)); + await client.connect(); - return redis.createClient(Object.assign(config, { - url: redisUrl, - })); + return client; }; const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1)); @@ -102,7 +111,7 @@ const startMaster = () => { log.warn(`Starting streaming API server master with ${numWorkers} workers`); }; -const startWorker = (workerId) => { +const startWorker = async (workerId) => { log.warn(`Starting worker ${workerId}`); const pgConfigs = { @@ -127,7 +136,7 @@ const startWorker = (workerId) => { if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') { pgConfigs.development.ssl = true; - pgConfigs.production.ssl = true; + pgConfigs.production.ssl = true; } const app = express(); @@ -139,9 +148,11 @@ const startWorker = (workerId) => { const redisNamespace = process.env.REDIS_NAMESPACE || null; const redisParams = { - host: process.env.REDIS_HOST || '127.0.0.1', - port: process.env.REDIS_PORT || 6379, - db: process.env.REDIS_DB || 0, + socket: { + host: process.env.REDIS_HOST || '127.0.0.1', + port: process.env.REDIS_PORT || 6379, + }, + database: process.env.REDIS_DB || 0, password: process.env.REDIS_PASSWORD || undefined, }; @@ -151,25 +162,8 @@ const startWorker = (workerId) => { const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; - const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL); - const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL); - - /** - * @type {Object.>} - */ - const subs = {}; - - redisSubscribeClient.on('message', (channel, message) => { - const callbacks = subs[channel]; - - log.silly(`New message on channel ${channel}`); - - if (!callbacks) { - return; - } - - callbacks.forEach(callback => callback(message)); - }); + const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); + const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); /** * @param {string[]} channels @@ -197,34 +191,16 @@ const startWorker = (workerId) => { */ const subscribe = (channel, callback) => { log.silly(`Adding listener for ${channel}`); - subs[channel] = subs[channel] || []; - if (subs[channel].length === 0) { - log.verbose(`Subscribe ${channel}`); - redisSubscribeClient.subscribe(channel); - } - - subs[channel].push(callback); + redisSubscribeClient.subscribe(channel, callback); }; /** * @param {string} channel - * @param {function(string): void} callback */ - const unsubscribe = (channel, callback) => { - log.silly(`Removing listener for ${channel}`); + const unsubscribe = (channel) => { - if (!subs[channel]) { - return; - } - - subs[channel] = subs[channel].filter(item => item !== callback); - - if (subs[channel].length === 0) { - log.verbose(`Unsubscribe ${channel}`); - redisSubscribeClient.unsubscribe(channel); - delete subs[channel]; - } + redisSubscribeClient.unsubscribe(channel); }; const FALSE_VALUES = [ @@ -365,7 +341,7 @@ const startWorker = (workerId) => { const { path, query } = req; const onlyMedia = isTruthy(query.only_media); - switch(path) { + switch (path) { case '/api/v1/streaming/user': return 'user'; case '/api/v1/streaming/user/notification': @@ -496,7 +472,7 @@ const startWorker = (workerId) => { const listener = createSystemMessageListener(req, { - onKill () { + onKill() { res.end(); }, @@ -548,7 +524,7 @@ const startWorker = (workerId) => { }; /** - * @param {array} + * @param {array} arr * @param {number=} shift * @return {string} */ @@ -590,7 +566,7 @@ const startWorker = (workerId) => { * @return {function(string): void} */ const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => { - const accountId = req.accountId || req.remoteAddress; + const accountId = req.accountId || req.remoteAddress; log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`); @@ -602,8 +578,8 @@ const startWorker = (workerId) => { const { event, payload, queued_at } = json; const transmit = () => { - const now = new Date().getTime(); - const delta = now - queued_at; + const now = new Date().getTime(); + const delta = now - queued_at; const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload; log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`); @@ -617,9 +593,9 @@ const startWorker = (workerId) => { return; } - const unpackedPayload = payload; + const unpackedPayload = payload; const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id)); - const accountDomain = unpackedPayload.account.acct.split('@')[1]; + const accountDomain = unpackedPayload.account.acct.split('@')[1]; if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) { log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`); @@ -639,7 +615,15 @@ const startWorker = (workerId) => { } const queries = [ - client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)), + client.query(`SELECT 1 + FROM blocks + WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) + OR (account_id = $2 AND target_account_id = $1) + UNION + SELECT 1 + FROM mutes + WHERE account_id = $1 + AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)), ]; if (accountDomain) { @@ -702,12 +686,12 @@ const startWorker = (workerId) => { /** * @param {any} req * @param {function(): void} [closeHandler] - * @return {function(string[], function(string): void)} + * @return {function(string[]): void} */ - const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => { + const streamHttpEnd = (req, closeHandler = undefined) => (ids) => { req.on('close', () => { ids.forEach(id => { - unsubscribe(id, listener); + unsubscribe(id); }); if (closeHandler) { @@ -754,7 +738,7 @@ const startWorker = (workerId) => { app.get('/api/v1/streaming/*', (req, res) => { channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => { const onSend = streamToHttp(req, res); - const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); + const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering); }).catch(err => { @@ -797,7 +781,7 @@ const startWorker = (workerId) => { * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>} */ const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => { - switch(name) { + switch (name) { case 'user': resolve({ channelIds: channelsForUserStream(req), @@ -927,14 +911,17 @@ const startWorker = (workerId) => { * @param {StreamParams} params */ const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) => - checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ channelIds, options }) => { + checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ + channelIds, + options, + }) => { if (subscriptions[channelIds.join(';')]) { return; } - const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params)); + const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params)); const stopHeartbeat = subscriptionHeartbeat(channelIds); - const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering); + const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering); subscriptions[channelIds.join(';')] = { listener, @@ -982,7 +969,7 @@ const startWorker = (workerId) => { const listener = createSystemMessageListener(request, { - onKill () { + onKill() { socket.close(); }, @@ -992,7 +979,8 @@ const startWorker = (workerId) => { subscriptions[systemChannelId] = { listener, - stopHeartbeat: () => {}, + stopHeartbeat: () => { + }, }; }; @@ -1011,7 +999,7 @@ const startWorker = (workerId) => { wss.on('connection', (ws, req) => { const location = url.parse(req.url, true); - req.requestId = uuid.v4(); + req.requestId = uuid.v4(); req.remoteAddress = ws._socket.remoteAddress; ws.isAlive = true;