From 1792be342a3cfad7bdfa54311b3962a8051962bb Mon Sep 17 00:00:00 2001 From: Claire Date: Thu, 27 Jul 2023 15:12:10 +0200 Subject: [PATCH 1/6] Fix wrong filters sometimes applying in streaming (#26159) --- streaming/index.js | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/streaming/index.js b/streaming/index.js index ffb570c5b..fe6d36dce 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -636,7 +636,7 @@ const startWorker = async (workerId) => { const listener = message => { const { event, payload, queued_at } = message; - const transmit = () => { + const transmit = (payload) => { const now = new Date().getTime(); const delta = now - queued_at; const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload; @@ -648,22 +648,21 @@ const startWorker = async (workerId) => { // Only messages that may require filtering are statuses, since notifications // are already personalized and deletes do not matter if (!needsFiltering || event !== 'update') { - transmit(); + transmit(payload); return; } - const unpackedPayload = payload; - const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id)); - const accountDomain = unpackedPayload.account.acct.split('@')[1]; + const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id)); + const accountDomain = payload.account.acct.split('@')[1]; - if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) { - log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`); + if (Array.isArray(req.chosenLanguages) && payload.language !== null && req.chosenLanguages.indexOf(payload.language) === -1) { + log.silly(req.requestId, `Message ${payload.id} filtered by language (${payload.language})`); return; } // When the account is not logged in, it is not necessary to confirm the block or mute if (!req.accountId) { - transmit(); + transmit(payload); return; } @@ -682,14 +681,14 @@ const startWorker = async (workerId) => { SELECT 1 FROM mutes WHERE account_id = $1 - AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)), + AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.account.id].concat(targetAccountIds)), ]; if (accountDomain) { queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain])); } - if (!unpackedPayload.filtered && !req.cachedFilters) { + if (!payload.filtered && !req.cachedFilters) { queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId])); } @@ -700,7 +699,7 @@ const startWorker = async (workerId) => { return; } - if (!unpackedPayload.filtered && !req.cachedFilters) { + if (!payload.filtered && !req.cachedFilters) { const filterRows = values[accountDomain ? 2 : 1].rows; req.cachedFilters = filterRows.reduce((cache, row) => { @@ -743,27 +742,30 @@ const startWorker = async (workerId) => { } // Check filters - if (req.cachedFilters && !unpackedPayload.filtered) { - const status = unpackedPayload; + if (req.cachedFilters && !payload.filtered) { + const mutatedPayload = { ...payload }; + const status = payload; const searchContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(//g, '\n').replace(/<\/p>

/g, '\n\n'); const searchIndex = JSDOM.fragment(searchContent).textContent; const now = new Date(); - payload.filtered = []; + mutatedPayload.filtered = []; Object.values(req.cachedFilters).forEach((cachedFilter) => { if ((cachedFilter.expires_at === null || cachedFilter.expires_at > now)) { const keyword_matches = searchIndex.match(cachedFilter.regexp); if (keyword_matches) { - payload.filtered.push({ + mutatedPayload.filtered.push({ filter: cachedFilter.repr, keyword_matches, }); } } }); - } - transmit(); + transmit(mutatedPayload); + } else { + transmit(payload); + } }).catch(err => { log.error(err); done(); From da230600acda1d1a151eab4caa3d536ce828a097 Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Thu, 27 Jul 2023 15:38:18 +0200 Subject: [PATCH 2/6] Refactor streaming's filtering logic & improve documentation (#26213) --- streaming/index.js | 162 ++++++++++++++++++++++++++++++--------------- 1 file changed, 110 insertions(+), 52 deletions(-) diff --git a/streaming/index.js b/streaming/index.js index fe6d36dce..010dee581 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -632,29 +632,39 @@ const startWorker = async (workerId) => { log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`); - // Currently message is of type string, soon it'll be Record + const transmit = (event, payload) => { + // TODO: Replace "string"-based delete payloads with object payloads: + const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload; + + log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`); + output(event, encodedPayload); + }; + + // The listener used to process each message off the redis subscription, + // message here is an object with an `event` and `payload` property. Some + // events also include a queued_at value, but this is being removed shortly. const listener = message => { - const { event, payload, queued_at } = message; + const { event, payload } = message; - const transmit = (payload) => { - const now = new Date().getTime(); - const delta = now - queued_at; - const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload; - - log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`); - output(event, encodedPayload); - }; - - // Only messages that may require filtering are statuses, since notifications - // are already personalized and deletes do not matter - if (!needsFiltering || event !== 'update') { - transmit(payload); + // Streaming only needs to apply filtering to some channels and only to + // some events. This is because majority of the filtering happens on the + // Ruby on Rails side when producing the event for streaming. + // + // The only events that require filtering from the streaming server are + // `update` and `status.update`, all other events are transmitted to the + // client as soon as they're received (pass-through). + // + // The channels that need filtering are determined in the function + // `channelNameToIds` defined below: + if (!needsFiltering || (event !== 'update' && event !== 'status.update')) { + transmit(event, payload); return; } - const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id)); - const accountDomain = payload.account.acct.split('@')[1]; + // The rest of the logic from here on in this function is to handle + // filtering of statuses: + // Filter based on language: if (Array.isArray(req.chosenLanguages) && payload.language !== null && req.chosenLanguages.indexOf(payload.language) === -1) { log.silly(req.requestId, `Message ${payload.id} filtered by language (${payload.language})`); return; @@ -662,11 +672,16 @@ const startWorker = async (workerId) => { // When the account is not logged in, it is not necessary to confirm the block or mute if (!req.accountId) { - transmit(payload); + transmit(event, payload); return; } - pgPool.connect((err, client, done) => { + // Filter based on domain blocks, blocks, mutes, or custom filters: + const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id)); + const accountDomain = payload.account.acct.split('@')[1]; + + // TODO: Move this logic out of the message handling loop + pgPool.connect((err, client, releasePgConnection) => { if (err) { log.error(err); return; @@ -693,28 +708,45 @@ const startWorker = async (workerId) => { } Promise.all(queries).then(values => { - done(); + releasePgConnection(); + // Handling blocks & mutes and domain blocks: If one of those applies, + // then we don't transmit the payload of the event to the client if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) { return; } - if (!payload.filtered && !req.cachedFilters) { + // If the payload already contains the `filtered` property, it means + // that filtering has been applied on the ruby on rails side, as + // such, we don't need to construct or apply the filters in streaming: + if (Object.prototype.hasOwnProperty.call(payload, "filtered")) { + transmit(event, payload); + return; + } + + // Handling for constructing the custom filters and caching them on the request + // TODO: Move this logic out of the message handling lifecycle + if (!req.cachedFilters) { const filterRows = values[accountDomain ? 2 : 1].rows; - req.cachedFilters = filterRows.reduce((cache, row) => { - if (cache[row.id]) { - cache[row.id].keywords.push([row.keyword, row.whole_word]); + req.cachedFilters = filterRows.reduce((cache, filter) => { + if (cache[filter.id]) { + cache[filter.id].keywords.push([filter.keyword, filter.whole_word]); } else { - cache[row.id] = { - keywords: [[row.keyword, row.whole_word]], - expires_at: row.expires_at, - repr: { - id: row.id, - title: row.title, - context: row.context, - expires_at: row.expires_at, - filter_action: ['warn', 'hide'][row.filter_action], + cache[filter.id] = { + keywords: [[filter.keyword, filter.whole_word]], + expires_at: filter.expires_at, + filter: { + id: filter.id, + title: filter.title, + context: filter.context, + expires_at: filter.expires_at, + // filter.filter_action is the value from the + // custom_filters.action database column, it is an integer + // representing a value in an enum defined by Ruby on Rails: + // + // enum { warn: 0, hide: 1 } + filter_action: ['warn', 'hide'][filter.filter_action], }, }; } @@ -722,6 +754,10 @@ const startWorker = async (workerId) => { return cache; }, {}); + // Construct the regular expressions for the custom filters: This + // needs to be done in a separate loop as the database returns one + // filterRow per keyword, so we need all the keywords before + // constructing the regular expression Object.keys(req.cachedFilters).forEach((key) => { req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => { let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); @@ -741,34 +777,56 @@ const startWorker = async (workerId) => { }); } - // Check filters - if (req.cachedFilters && !payload.filtered) { - const mutatedPayload = { ...payload }; + // Apply cachedFilters against the payload, constructing a + // `filter_results` array of FilterResult entities + if (req.cachedFilters) { const status = payload; - const searchContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(//g, '\n').replace(/<\/p>

/g, '\n\n'); - const searchIndex = JSDOM.fragment(searchContent).textContent; + // TODO: Calculate searchableContent in Ruby on Rails: + const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(//g, '\n').replace(/<\/p>

/g, '\n\n'); + const searchableTextContent = JSDOM.fragment(searchableContent).textContent; const now = new Date(); - mutatedPayload.filtered = []; - Object.values(req.cachedFilters).forEach((cachedFilter) => { - if ((cachedFilter.expires_at === null || cachedFilter.expires_at > now)) { - const keyword_matches = searchIndex.match(cachedFilter.regexp); - if (keyword_matches) { - mutatedPayload.filtered.push({ - filter: cachedFilter.repr, - keyword_matches, - }); - } + const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => { + // Check the filter hasn't expired before applying: + if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) { + return; } - }); - transmit(mutatedPayload); + // Just in-case JSDOM fails to find textContent in searchableContent + if (!searchableTextContent) { + return; + } + + const keyword_matches = searchableTextContent.match(cachedFilter.regexp); + if (keyword_matches) { + // results is an Array of FilterResult; status_matches is always + // null as we only are only applying the keyword-based custom + // filters, not the status-based custom filters. + // https://docs.joinmastodon.org/entities/FilterResult/ + results.push({ + filter: cachedFilter.filter, + keyword_matches, + status_matches: null + }); + } + }, []); + + // Send the payload + the FilterResults as the `filtered` property + // to the streaming connection. To reach this code, the `event` must + // have been either `update` or `status.update`, meaning the + // `payload` is a Status entity, which has a `filtered` property: + // + // filtered: https://docs.joinmastodon.org/entities/Status/#filtered + transmit(event, { + ...payload, + filtered: filter_results + }); } else { - transmit(payload); + transmit(event, payload); } }).catch(err => { + releasePgConnection(); log.error(err); - done(); }); }); }; From 6c321bb5e1543c78dbd0fa8e4962e95e544e1f63 Mon Sep 17 00:00:00 2001 From: Claire Date: Sat, 22 Jul 2023 20:42:31 +0200 Subject: [PATCH 3/6] Fix incorrect connect timeout in outgoing requests (#26116) --- app/lib/request.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/lib/request.rb b/app/lib/request.rb index 9c02410a5..660dc0fa7 100644 --- a/app/lib/request.rb +++ b/app/lib/request.rb @@ -285,11 +285,11 @@ class Request end until socks.empty? - _, available_socks, = IO.select(nil, socks, nil, Request::TIMEOUT[:connect]) + _, available_socks, = IO.select(nil, socks, nil, Request::TIMEOUT[:connect_timeout]) if available_socks.nil? socks.each(&:close) - raise HTTP::TimeoutError, "Connect timed out after #{Request::TIMEOUT[:connect]} seconds" + raise HTTP::TimeoutError, "Connect timed out after #{Request::TIMEOUT[:connect_timeout]} seconds" end available_socks.each do |sock| From 2eb1a5b7b6d8b6a0b9426e7ee5a1fd04519dd7e2 Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Fri, 28 Jul 2023 12:06:29 +0200 Subject: [PATCH 4/6] Fix: Streaming server memory leak in HTTP EventSource cleanup (#26228) --- streaming/index.js | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/streaming/index.js b/streaming/index.js index 010dee581..1913be81f 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -226,9 +226,15 @@ const startWorker = async (workerId) => { callbacks.forEach(callback => callback(json)); }; + /** + * @callback SubscriptionListener + * @param {ReturnType} json of the message + * @returns void + */ + /** * @param {string} channel - * @param {function(string): void} callback + * @param {SubscriptionListener} callback */ const subscribe = (channel, callback) => { log.silly(`Adding listener for ${channel}`); @@ -245,7 +251,7 @@ const startWorker = async (workerId) => { /** * @param {string} channel - * @param {function(Object): void} callback + * @param {SubscriptionListener} callback */ const unsubscribe = (channel, callback) => { log.silly(`Removing listener for ${channel}`); @@ -623,9 +629,9 @@ const startWorker = async (workerId) => { * @param {string[]} ids * @param {any} req * @param {function(string, string): void} output - * @param {function(string[], function(string): void): void} attachCloseHandler + * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler * @param {boolean=} needsFiltering - * @returns {function(object): void} + * @returns {SubscriptionListener} */ const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => { const accountId = req.accountId || req.remoteAddress; @@ -643,6 +649,7 @@ const startWorker = async (workerId) => { // The listener used to process each message off the redis subscription, // message here is an object with an `event` and `payload` property. Some // events also include a queued_at value, but this is being removed shortly. + /** @type {SubscriptionListener} */ const listener = message => { const { event, payload } = message; @@ -835,7 +842,7 @@ const startWorker = async (workerId) => { subscribe(`${redisPrefix}${id}`, listener); }); - if (attachCloseHandler) { + if (typeof attachCloseHandler === 'function') { attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener); } @@ -872,12 +879,13 @@ const startWorker = async (workerId) => { /** * @param {any} req * @param {function(): void} [closeHandler] - * @return {function(string[]): void} + * @returns {function(string[], SubscriptionListener): void} */ - const streamHttpEnd = (req, closeHandler = undefined) => (ids) => { + + const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => { req.on('close', () => { ids.forEach(id => { - unsubscribe(id); + unsubscribe(id, listener); }); if (closeHandler) { @@ -1137,7 +1145,7 @@ const startWorker = async (workerId) => { * @typedef WebSocketSession * @property {any} socket * @property {any} request - * @property {Object.} subscriptions + * @property {Object.} subscriptions */ /** From 2fc6117d1b5643c0de908706d22702a35388a2a4 Mon Sep 17 00:00:00 2001 From: Renaud Chaput Date: Fri, 28 Jul 2023 19:11:58 +0200 Subject: [PATCH 5/6] Fix missing return values in streaming (#26233) --- streaming/index.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/streaming/index.js b/streaming/index.js index 1913be81f..3fea71f94 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -724,7 +724,7 @@ const startWorker = async (workerId) => { } // If the payload already contains the `filtered` property, it means - // that filtering has been applied on the ruby on rails side, as + // that filtering has been applied on the ruby on rails side, as // such, we don't need to construct or apply the filters in streaming: if (Object.prototype.hasOwnProperty.call(payload, "filtered")) { transmit(event, payload); @@ -796,12 +796,12 @@ const startWorker = async (workerId) => { const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => { // Check the filter hasn't expired before applying: if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) { - return; + return results; } // Just in-case JSDOM fails to find textContent in searchableContent if (!searchableTextContent) { - return; + return results; } const keyword_matches = searchableTextContent.match(cachedFilter.regexp); @@ -816,6 +816,8 @@ const startWorker = async (workerId) => { status_matches: null }); } + + return results; }, []); // Send the payload + the FilterResults as the `filtered` property From ac7d40b561101084baf4688167d155600eefe9dc Mon Sep 17 00:00:00 2001 From: Claire Date: Thu, 27 Jul 2023 17:08:09 +0200 Subject: [PATCH 6/6] Bump version to v4.1.6 --- CHANGELOG.md | 8 ++++++++ docker-compose.yml | 6 +++--- lib/mastodon/version.rb | 2 +- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ab9eb10d..aeb89ef5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,14 @@ Changelog All notable changes to this project will be documented in this file. +## [4.1.6] - 2023-07-31 + +### Fixed + +- Fix memory leak in streaming server ([ThisIsMissEm](https://github.com/mastodon/mastodon/pull/26228)) +- Fix wrong filters sometimes applying in streaming ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/26159), [ThisIsMissEm](https://github.com/mastodon/mastodon/pull/26213), [renchap](https://github.com/mastodon/mastodon/pull/26233)) +- Fix incorrect connect timeout in outgoing requests ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/26116)) + ## [4.1.5] - 2023-07-21 ### Added diff --git a/docker-compose.yml b/docker-compose.yml index e3fa9ae1e..ae6aefb37 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -56,7 +56,7 @@ services: web: build: . - image: ghcr.io/mastodon/mastodon:v4.1.5 + image: ghcr.io/mastodon/mastodon:v4.1.6 restart: always env_file: .env.production command: bash -c "rm -f /mastodon/tmp/pids/server.pid; bundle exec rails s -p 3000" @@ -77,7 +77,7 @@ services: streaming: build: . - image: ghcr.io/mastodon/mastodon:v4.1.5 + image: ghcr.io/mastodon/mastodon:v4.1.6 restart: always env_file: .env.production command: node ./streaming @@ -95,7 +95,7 @@ services: sidekiq: build: . - image: ghcr.io/mastodon/mastodon:v4.1.5 + image: ghcr.io/mastodon/mastodon:v4.1.6 restart: always env_file: .env.production command: bundle exec sidekiq diff --git a/lib/mastodon/version.rb b/lib/mastodon/version.rb index e1b0afb7e..1277084f0 100644 --- a/lib/mastodon/version.rb +++ b/lib/mastodon/version.rb @@ -13,7 +13,7 @@ module Mastodon end def patch - 5 + 6 end def flags