diff --git a/streaming/index.js b/streaming/index.js index 279ebbad8..4b2607ed9 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -52,18 +52,31 @@ const redisUrlToClient = async (defaultConfig, redisUrl) => { }; /** + * Attempts to safely parse a string as JSON, used when both receiving a message + * from redis and when receiving a message from a client over a websocket + * connection, this is why it accepts a `req` argument. * @param {string} json - * @param {any} req + * @param {any?} req * @returns {Object.|null} */ const parseJSON = (json, req) => { try { return JSON.parse(json); } catch (err) { - if (req.accountId) { - log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`); + /* FIXME: This logging isn't great, and should probably be done at the + * call-site of parseJSON, not in the method, but this would require changing + * the signature of parseJSON to return something akin to a Result type: + * [Error|null, null|Object { const { redisParams, redisUrl, redisPrefix } = redisConfigFromEnv(process.env); /** - * @type {Object.>} + * @type {Object.): void>>} */ const subs = {}; @@ -203,7 +216,10 @@ const startServer = async () => { return; } - callbacks.forEach(callback => callback(message)); + const json = parseJSON(message, null); + if (!json) return; + + callbacks.forEach(callback => callback(json)); }; /** @@ -225,7 +241,7 @@ const startServer = async () => { /** * @param {string} channel - * @param {function(string): void} callback + * @param {function(Object): void} callback */ const unsubscribe = (channel, callback) => { log.silly(`Removing listener for ${channel}`); @@ -369,7 +385,7 @@ const startServer = async () => { /** * @param {any} req - * @returns {string} + * @returns {string|undefined} */ const channelNameFromPath = req => { const { path, query } = req; @@ -478,15 +494,11 @@ const startServer = async () => { /** * @param {any} req * @param {SystemMessageHandlers} eventHandlers - * @returns {function(string): void} + * @returns {function(object): void} */ const createSystemMessageListener = (req, eventHandlers) => { return message => { - const json = parseJSON(message, req); - - if (!json) return; - - const { event } = json; + const { event } = message; log.silly(req.requestId, `System message for ${req.accountId}: ${event}`); @@ -603,19 +615,16 @@ const startServer = async () => { * @param {function(string, string): void} output * @param {function(string[], function(string): void): void} attachCloseHandler * @param {boolean=} needsFiltering - * @returns {function(string): void} + * @returns {function(object): void} */ const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => { const accountId = req.accountId || req.remoteAddress; log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`); + // Currently message is of type string, soon it'll be Record const listener = message => { - const json = parseJSON(message, req); - - if (!json) return; - - const { event, payload, queued_at } = json; + const { event, payload, queued_at } = message; const transmit = () => { const now = new Date().getTime(); @@ -1198,8 +1207,15 @@ const startServer = async () => { ws.on('close', onEnd); ws.on('error', onEnd); - ws.on('message', data => { - const json = parseJSON(data, session.request); + ws.on('message', (data, isBinary) => { + if (isBinary) { + log.debug('Received binary data, closing connection'); + ws.close(1003, 'The mastodon streaming server does not support binary messages'); + return; + } + const message = data.toString('utf8'); + + const json = parseJSON(message, session.request); if (!json) return;