diff --git a/package.json b/package.json index 72a378299..76ece5f15 100644 --- a/package.json +++ b/package.json @@ -94,6 +94,7 @@ "pg-connection-string": "^2.6.0", "postcss": "^8.4.24", "postcss-loader": "^4.3.0", + "prom-client": "^14.2.0", "prop-types": "^15.8.1", "punycode": "^2.3.0", "react": "^18.2.0", diff --git a/streaming/index.js b/streaming/index.js index 8a34c273b..2112ca433 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -10,6 +10,7 @@ const { JSDOM } = require('jsdom'); const log = require('npmlog'); const pg = require('pg'); const dbUrlToConfig = require('pg-connection-string').parse; +const metrics = require('prom-client'); const redis = require('redis'); const uuid = require('uuid'); const WebSocket = require('ws'); @@ -183,6 +184,73 @@ const startServer = async () => { const redisSubscribeClient = await redisUrlToClient(redisParams, redisUrl); const redisClient = await redisUrlToClient(redisParams, redisUrl); + // Collect metrics from Node.js + metrics.collectDefaultMetrics(); + + new metrics.Gauge({ + name: 'pg_pool_total_connections', + help: 'The total number of clients existing within the pool', + collect() { + this.set(pgPool.totalCount); + }, + }); + + new metrics.Gauge({ + name: 'pg_pool_idle_connections', + help: 'The number of clients which are not checked out but are currently idle in the pool', + collect() { + this.set(pgPool.idleCount); + }, + }); + + new metrics.Gauge({ + name: 'pg_pool_waiting_queries', + help: 'The number of queued requests waiting on a client when all clients are checked out', + collect() { + this.set(pgPool.waitingCount); + }, + }); + + const connectedClients = new metrics.Gauge({ + name: 'connected_clients', + help: 'The number of clients connected to the streaming server', + labelNames: ['type'], + }); + + connectedClients.set({ type: 'websocket' }, 0); + connectedClients.set({ type: 'eventsource' }, 0); + + const connectedChannels = new metrics.Gauge({ + name: 'connected_channels', + help: 'The number of channels the streaming server is streaming to', + labelNames: [ 'type', 'channel' ] + }); + + const redisSubscriptions = new metrics.Gauge({ + name: 'redis_subscriptions', + help: 'The number of Redis channels the streaming server is subscribed to', + }); + + // When checking metrics in the browser, the favicon is requested this + // prevents the request from falling through to the API Router, which would + // error for this endpoint: + app.get('/favicon.ico', (req, res) => res.status(404).end()); + + app.get('/api/v1/streaming/health', (req, res) => { + res.writeHead(200, { 'Content-Type': 'text/plain' }); + res.end('OK'); + }); + + app.get('/metrics', async (req, res) => { + try { + res.set('Content-Type', metrics.register.contentType); + res.end(await metrics.register.metrics()); + } catch (ex) { + log.error(ex); + res.status(500).end(); + } + }); + /** * @param {string[]} channels * @returns {function(): void} @@ -240,6 +308,7 @@ const startServer = async () => { if (subs[channel].length === 0) { log.verbose(`Subscribe ${channel}`); redisSubscribeClient.subscribe(channel, onRedisMessage); + redisSubscriptions.inc(); } subs[channel].push(callback); @@ -261,6 +330,7 @@ const startServer = async () => { if (subs[channel].length === 0) { log.verbose(`Unsubscribe ${channel}`); redisSubscribeClient.unsubscribe(channel); + redisSubscriptions.dec(); delete subs[channel]; } }; @@ -434,7 +504,7 @@ const startServer = async () => { /** * @param {any} req - * @param {string} channelName + * @param {string|undefined} channelName * @returns {Promise.} */ const checkScopes = (req, channelName) => new Promise((resolve, reject) => { @@ -537,10 +607,14 @@ const startServer = async () => { res.on('close', () => { unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener); unsubscribe(`${redisPrefix}${systemChannelId}`, listener); + + connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2); }); subscribe(`${redisPrefix}${accessTokenChannelId}`, listener); subscribe(`${redisPrefix}${systemChannelId}`, listener); + + connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2); }; /** @@ -554,7 +628,19 @@ const startServer = async () => { return; } - accountFromRequest(req).then(() => checkScopes(req, channelNameFromPath(req))).then(() => { + const channelName = channelNameFromPath(req); + + // If no channelName can be found for the request, then we should terminate + // the connection, as there's nothing to stream back + if (!channelName) { + const err = new Error('Unknown channel requested'); + err.status = 400; + + next(err); + return; + } + + accountFromRequest(req).then(() => checkScopes(req, channelName)).then(() => { subscribeHttpToSystemChannel(req, res); }).then(() => { next(); @@ -849,6 +935,15 @@ const startServer = async () => { const streamToHttp = (req, res) => { const accountId = req.accountId || req.remoteAddress; + const channelName = channelNameFromPath(req); + + connectedClients.labels({ type: 'eventsource' }).inc(); + + // In theory we'll always have a channel name, but channelNameFromPath can return undefined: + if (typeof channelName === 'string') { + connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc(); + } + res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-store'); res.setHeader('Transfer-Encoding', 'chunked'); @@ -859,6 +954,14 @@ const startServer = async () => { req.on('close', () => { log.verbose(req.requestId, `Ending stream for ${accountId}`); + // We decrement these counters here instead of in streamHttpEnd as in that + // method we don't have knowledge of the channel names + connectedClients.labels({ type: 'eventsource' }).dec(); + // In theory we'll always have a channel name, but channelNameFromPath can return undefined: + if (typeof channelName === 'string') { + connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec(); + } + clearInterval(heartbeat); }); @@ -913,40 +1016,18 @@ const startServer = async () => { res.end(JSON.stringify({ error: 'Not found' })); }; - app.use(setRequestId); - app.use(setRemoteAddress); - app.use(allowCrossDomain); + const api = express.Router(); - app.get('/api/v1/streaming/health', (req, res) => { - res.writeHead(200, { 'Content-Type': 'text/plain' }); - res.end('OK'); - }); + app.use(api); - app.get('/metrics', (req, res) => server.getConnections((err, count) => { - res.writeHeader(200, { 'Content-Type': 'application/openmetrics-text; version=1.0.0; charset=utf-8' }); - res.write('# TYPE connected_clients gauge\n'); - res.write('# HELP connected_clients The number of clients connected to the streaming server\n'); - res.write(`connected_clients ${count}.0\n`); - res.write('# TYPE connected_channels gauge\n'); - res.write('# HELP connected_channels The number of Redis channels the streaming server is subscribed to\n'); - res.write(`connected_channels ${Object.keys(subs).length}.0\n`); - res.write('# TYPE pg_pool_total_connections gauge\n'); - res.write('# HELP pg_pool_total_connections The total number of clients existing within the pool\n'); - res.write(`pg_pool_total_connections ${pgPool.totalCount}.0\n`); - res.write('# TYPE pg_pool_idle_connections gauge\n'); - res.write('# HELP pg_pool_idle_connections The number of clients which are not checked out but are currently idle in the pool\n'); - res.write(`pg_pool_idle_connections ${pgPool.idleCount}.0\n`); - res.write('# TYPE pg_pool_waiting_queries gauge\n'); - res.write('# HELP pg_pool_waiting_queries The number of queued requests waiting on a client when all clients are checked out\n'); - res.write(`pg_pool_waiting_queries ${pgPool.waitingCount}.0\n`); - res.write('# EOF\n'); - res.end(); - })); + api.use(setRequestId); + api.use(setRemoteAddress); + api.use(allowCrossDomain); - app.use(authenticationMiddleware); - app.use(errorMiddleware); + api.use(authenticationMiddleware); + api.use(errorMiddleware); - app.get('/api/v1/streaming/*', (req, res) => { + api.get('/api/v1/streaming/*', (req, res) => { channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => { const onSend = streamToHttp(req, res); const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); @@ -1141,15 +1222,16 @@ const startServer = async () => { * @typedef WebSocketSession * @property {any} socket * @property {any} request - * @property {Object.} subscriptions + * @property {Object.} subscriptions */ /** * @param {WebSocketSession} session * @param {string} channelName * @param {StreamParams} params + * @returns {void} */ - const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) => + const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) => { checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ channelIds, options, @@ -1162,7 +1244,10 @@ const startServer = async () => { const stopHeartbeat = subscriptionHeartbeat(channelIds); const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering); + connectedChannels.labels({ type: 'websocket', channel: channelName }).inc(); + subscriptions[channelIds.join(';')] = { + channelName, listener, stopHeartbeat, }; @@ -1170,35 +1255,47 @@ const startServer = async () => { log.verbose(request.requestId, 'Subscription error:', err.toString()); socket.send(JSON.stringify({ error: err.toString() })); }); + } + + + const removeSubscription = (subscriptions, channelIds, request) => { + log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`); + + const subscription = subscriptions[channelIds.join(';')]; + + if (!subscription) { + return; + } + + channelIds.forEach(channelId => { + unsubscribe(`${redisPrefix}${channelId}`, subscription.listener); + }); + + connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec(); + subscription.stopHeartbeat(); + + delete subscriptions[channelIds.join(';')]; + } /** * @param {WebSocketSession} session * @param {string} channelName * @param {StreamParams} params + * @returns {void} */ - const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) => + const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) => { channelNameToIds(request, channelName, params).then(({ channelIds }) => { - log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`); - - const subscription = subscriptions[channelIds.join(';')]; - - if (!subscription) { - return; - } - - const { listener, stopHeartbeat } = subscription; - - channelIds.forEach(channelId => { - unsubscribe(`${redisPrefix}${channelId}`, listener); - }); - - stopHeartbeat(); - - delete subscriptions[channelIds.join(';')]; + removeSubscription(subscriptions, channelIds, request); }).catch(err => { - log.verbose(request.requestId, 'Unsubscription error:', err); - socket.send(JSON.stringify({ error: err.toString() })); + log.verbose(request.requestId, 'Unsubscribe error:', err); + + // If we have a socket that is alive and open still, send the error back to the client: + // FIXME: In other parts of the code ws === socket + if (socket.isAlive && socket.readyState === socket.OPEN) { + socket.send(JSON.stringify({ error: "Error unsubscribing from channel" })); + } }); + } /** * @param {WebSocketSession} session @@ -1219,16 +1316,20 @@ const startServer = async () => { subscribe(`${redisPrefix}${systemChannelId}`, listener); subscriptions[accessTokenChannelId] = { + channelName: 'system', listener, stopHeartbeat: () => { }, }; subscriptions[systemChannelId] = { + channelName: 'system', listener, stopHeartbeat: () => { }, }; + + connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2); }; /** @@ -1255,6 +1356,8 @@ const startServer = async () => { ws.isAlive = true; }); + connectedClients.labels({ type: 'websocket' }).inc(); + /** * @type {WebSocketSession} */ @@ -1265,17 +1368,18 @@ const startServer = async () => { }; const onEnd = () => { - const keys = Object.keys(session.subscriptions); + const subscriptions = Object.keys(session.subscriptions); - keys.forEach(channelIds => { - const { listener, stopHeartbeat } = session.subscriptions[channelIds]; - - channelIds.split(';').forEach(channelId => { - unsubscribe(`${redisPrefix}${channelId}`, listener); - }); - - stopHeartbeat(); + subscriptions.forEach(channelIds => { + removeSubscription(session.subscriptions, channelIds.split(';'), req) }); + + // ensure garbage collection: + session.socket = null; + session.request = null; + session.subscriptions = {}; + + connectedClients.labels({ type: 'websocket' }).dec(); }; ws.on('close', onEnd); diff --git a/yarn.lock b/yarn.lock index df45395e0..e72012e06 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3529,6 +3529,11 @@ bindings@^1.5.0: dependencies: file-uri-to-path "1.0.0" +bintrees@1.0.2: + version "1.0.2" + resolved "https://registry.yarnpkg.com/bintrees/-/bintrees-1.0.2.tgz#49f896d6e858a4a499df85c38fb399b9aff840f8" + integrity sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw== + blueimp-load-image@^3.0.0: version "3.0.0" resolved "https://registry.yarnpkg.com/blueimp-load-image/-/blueimp-load-image-3.0.0.tgz#d71c39440a7d2f1a83e3e86a625e329116a51705" @@ -9659,6 +9664,13 @@ process@^0.11.10: resolved "https://registry.yarnpkg.com/process/-/process-0.11.10.tgz#7332300e840161bda3e69a1d1d91a7d4bc16f182" integrity sha512-cdGef/drWFoydD1JsMzuFf8100nZl+GT+yacc2bEced5f9Rjk4z+WtFUTBu9PhOi9j/jfmBPu0mMEY4wIdAF8A== +prom-client@^14.2.0: + version "14.2.0" + resolved "https://registry.yarnpkg.com/prom-client/-/prom-client-14.2.0.tgz#ca94504e64156f6506574c25fb1c34df7812cf11" + integrity sha512-sF308EhTenb/pDRPakm+WgiN+VdM/T1RaHj1x+MvAuT8UiQP8JmOEbxVqtkbfR4LrvOg5n7ic01kRBDGXjYikA== + dependencies: + tdigest "^0.1.1" + promise-inflight@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/promise-inflight/-/promise-inflight-1.0.1.tgz#98472870bf228132fcbdd868129bad12c3c029e3" @@ -11562,6 +11574,13 @@ tar@^6.0.2: mkdirp "^1.0.3" yallist "^4.0.0" +tdigest@^0.1.1: + version "0.1.2" + resolved "https://registry.yarnpkg.com/tdigest/-/tdigest-0.1.2.tgz#96c64bac4ff10746b910b0e23b515794e12faced" + integrity sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA== + dependencies: + bintrees "1.0.2" + temp-dir@^2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/temp-dir/-/temp-dir-2.0.0.tgz#bde92b05bdfeb1516e804c9c00ad45177f31321e"