Fix websocket connections being incorrectly decremented twice on errors (#27238)

This commit is contained in:
Emelia Smith 2023-10-02 13:21:43 +02:00 committed by Claire
parent ac32f4b3c3
commit ccb980beac

View file

@ -1386,19 +1386,21 @@ const startServer = async () => {
}; };
wss.on('connection', (ws, req) => { wss.on('connection', (ws, req) => {
const location = url.parse(req.url, true); // Note: url.parse could throw, which would terminate the connection, so we
// increment the connected clients metric straight away when we establish
// the connection, without waiting:
connectedClients.labels({ type: 'websocket' }).inc();
// Setup request properties:
req.requestId = uuid.v4(); req.requestId = uuid.v4();
req.remoteAddress = ws._socket.remoteAddress; req.remoteAddress = ws._socket.remoteAddress;
// Setup connection keep-alive state:
ws.isAlive = true; ws.isAlive = true;
ws.on('pong', () => { ws.on('pong', () => {
ws.isAlive = true; ws.isAlive = true;
}); });
connectedClients.labels({ type: 'websocket' }).inc();
/** /**
* @type {WebSocketSession} * @type {WebSocketSession}
*/ */
@ -1408,27 +1410,31 @@ const startServer = async () => {
subscriptions: {}, subscriptions: {},
}; };
const onEnd = () => { ws.on('close', function onWebsocketClose() {
const subscriptions = Object.keys(session.subscriptions); const subscriptions = Object.keys(session.subscriptions);
subscriptions.forEach(channelIds => { subscriptions.forEach(channelIds => {
removeSubscription(session.subscriptions, channelIds.split(';'), req) removeSubscription(session.subscriptions, channelIds.split(';'), req)
}); });
// Decrement the metrics for connected clients:
connectedClients.labels({ type: 'websocket' }).dec();
// ensure garbage collection: // ensure garbage collection:
session.socket = null; session.socket = null;
session.request = null; session.request = null;
session.subscriptions = {}; session.subscriptions = {};
});
connectedClients.labels({ type: 'websocket' }).dec(); // Note: immediately after the `error` event is emitted, the `close` event
}; // is emitted. As such, all we need to do is log the error here.
ws.on('error', (err) => {
ws.on('close', onEnd); log.error('websocket', err.toString());
ws.on('error', onEnd); });
ws.on('message', (data, isBinary) => { ws.on('message', (data, isBinary) => {
if (isBinary) { if (isBinary) {
log.warn('socket', 'Received binary data, closing connection'); log.warn('websocket', 'Received binary data, closing connection');
ws.close(1003, 'The mastodon streaming server does not support binary messages'); ws.close(1003, 'The mastodon streaming server does not support binary messages');
return; return;
} }
@ -1451,7 +1457,10 @@ const startServer = async () => {
subscribeWebsocketToSystemChannel(session); subscribeWebsocketToSystemChannel(session);
if (location.query.stream) { // Parse the URL for the connection arguments (if supplied), url.parse can throw:
const location = req.url && url.parse(req.url, true);
if (location && location.query.stream) {
subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query); subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query);
} }
}); });