From ccb980beac2c918e81f9c9b5b5bbcc5901d11377 Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Mon, 2 Oct 2023 13:21:43 +0200 Subject: [PATCH] Fix websocket connections being incorrectly decremented twice on errors (#27238) --- streaming/index.js | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/streaming/index.js b/streaming/index.js index 8015c6815..3565ed278 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -1386,19 +1386,21 @@ const startServer = async () => { }; wss.on('connection', (ws, req) => { - const location = url.parse(req.url, true); + // Note: url.parse could throw, which would terminate the connection, so we + // increment the connected clients metric straight away when we establish + // the connection, without waiting: + connectedClients.labels({ type: 'websocket' }).inc(); + // Setup request properties: req.requestId = uuid.v4(); req.remoteAddress = ws._socket.remoteAddress; + // Setup connection keep-alive state: ws.isAlive = true; - ws.on('pong', () => { ws.isAlive = true; }); - connectedClients.labels({ type: 'websocket' }).inc(); - /** * @type {WebSocketSession} */ @@ -1408,27 +1410,31 @@ const startServer = async () => { subscriptions: {}, }; - const onEnd = () => { + ws.on('close', function onWebsocketClose() { const subscriptions = Object.keys(session.subscriptions); subscriptions.forEach(channelIds => { removeSubscription(session.subscriptions, channelIds.split(';'), req) }); + // Decrement the metrics for connected clients: + connectedClients.labels({ type: 'websocket' }).dec(); + // ensure garbage collection: session.socket = null; session.request = null; session.subscriptions = {}; + }); - connectedClients.labels({ type: 'websocket' }).dec(); - }; - - ws.on('close', onEnd); - ws.on('error', onEnd); + // Note: immediately after the `error` event is emitted, the `close` event + // is emitted. As such, all we need to do is log the error here. + ws.on('error', (err) => { + log.error('websocket', err.toString()); + }); ws.on('message', (data, isBinary) => { if (isBinary) { - log.warn('socket', 'Received binary data, closing connection'); + log.warn('websocket', 'Received binary data, closing connection'); ws.close(1003, 'The mastodon streaming server does not support binary messages'); return; } @@ -1451,7 +1457,10 @@ const startServer = async () => { subscribeWebsocketToSystemChannel(session); - if (location.query.stream) { + // Parse the URL for the connection arguments (if supplied), url.parse can throw: + const location = req.url && url.parse(req.url, true); + + if (location && location.query.stream) { subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query); } });