/* * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ import { EventEmitter } from 'events'; import { Inject, Injectable } from '@nestjs/common'; import * as Redis from 'ioredis'; import * as WebSocket from 'ws'; import { DI } from '@/di-symbols.js'; import type { UsersRepository, MiAccessToken } from '@/models/_.js'; import { NoteReadService } from '@/core/NoteReadService.js'; import { NotificationService } from '@/core/NotificationService.js'; import { bindThis } from '@/decorators.js'; import { CacheService } from '@/core/CacheService.js'; import { MiLocalUser } from '@/models/User.js'; import { UserService } from '@/core/UserService.js'; import { ChannelFollowingService } from '@/core/ChannelFollowingService.js'; import { AuthenticateService, AuthenticationError } from './AuthenticateService.js'; import MainStreamConnection from './stream/Connection.js'; import { ChannelsService } from './stream/ChannelsService.js'; import { RateLimiterService } from './RateLimiterService.js'; import { RoleService } from '@/core/RoleService.js'; import { getIpHash } from '@/misc/get-ip-hash.js'; import proxyAddr from 'proxy-addr'; import ms from 'ms'; import type * as http from 'node:http'; import type { IEndpointMeta } from './endpoints.js'; import { LoggerService } from '@/core/LoggerService.js'; import type Logger from '@/logger.js'; @Injectable() export class StreamingApiServerService { #wss: WebSocket.WebSocketServer; #connections = new Map(); #cleanConnectionsIntervalId: NodeJS.Timeout | null = null; constructor( @Inject(DI.redisForSub) private redisForSub: Redis.Redis, @Inject(DI.usersRepository) private usersRepository: UsersRepository, private cacheService: CacheService, private noteReadService: NoteReadService, private authenticateService: AuthenticateService, private channelsService: ChannelsService, private notificationService: NotificationService, private usersService: UserService, private channelFollowingService: ChannelFollowingService, private rateLimiterService: RateLimiterService, private roleService: RoleService, private loggerService: LoggerService, ) { } @bindThis private async rateLimitThis( user: MiLocalUser | null | undefined, requestIp: string | undefined, limit: IEndpointMeta['limit'] & { key: NonNullable }, ) : Promise { let limitActor: string; if (user) { limitActor = user.id; } else { limitActor = getIpHash(requestIp || 'wtf'); } const factor = user ? (await this.roleService.getUserPolicies(user.id)).rateLimitFactor : 1; if (factor <= 0) return false; // Rate limit return await this.rateLimiterService.limit(limit, limitActor, factor) .then(() => { return false; }) .catch(err => { return true; }); } @bindThis public attach(server: http.Server): void { this.#wss = new WebSocket.WebSocketServer({ noServer: true, }); server.on('upgrade', async (request, socket, head) => { if (request.url == null) { socket.write('HTTP/1.1 400 Bad Request\r\n\r\n'); socket.destroy(); return; } // ServerServices sets `trustProxy: true`, which inside // fastify/request.js ends up calling `proxyAddr` in this way, // so we do the same const requestIp = proxyAddr(request, () => { return true; } ); if (await this.rateLimitThis(null, requestIp, { key: 'wsconnect', duration: ms('5min'), max: 32, })) { socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n'); socket.destroy(); return; } const q = new URL(request.url, `http://${request.headers.host}`).searchParams; let user: MiLocalUser | null = null; let app: MiAccessToken | null = null; // https://datatracker.ietf.org/doc/html/rfc6750.html#section-2.1 // Note that the standard WHATWG WebSocket API does not support setting any headers, // but non-browser apps may still be able to set it. const token = request.headers.authorization?.startsWith('Bearer ') ? request.headers.authorization.slice(7) : q.get('i'); try { [user, app] = await this.authenticateService.authenticate(token); if (app !== null && !app.permission.some(p => p === 'read:account')) { throw new AuthenticationError('Your app does not have necessary permissions to use websocket API.'); } } catch (e) { if (e instanceof AuthenticationError) { socket.write([ 'HTTP/1.1 401 Unauthorized', 'WWW-Authenticate: Bearer realm="Misskey", error="invalid_token", error_description="Failed to authenticate"', ].join('\r\n') + '\r\n\r\n'); } else { socket.write('HTTP/1.1 500 Internal Server Error\r\n\r\n'); } socket.destroy(); return; } if (user?.isSuspended) { socket.write('HTTP/1.1 403 Forbidden\r\n\r\n'); socket.destroy(); return; } const rateLimiter = () => { // rather high limit, because when catching up at the top of a // timeline, the frontend may render many many notes, each of // which causes a message via `useNoteCapture` to ask for // realtime updates of that note return this.rateLimitThis(user, requestIp, { key: 'wsmessage', duration: ms('2sec'), max: 4096, }); }; const stream = new MainStreamConnection( this.channelsService, this.noteReadService, this.notificationService, this.cacheService, this.channelFollowingService, this.loggerService, user, app, requestIp, rateLimiter, ); await stream.init(); this.#wss.handleUpgrade(request, socket, head, (ws) => { this.#wss.emit('connection', ws, request, { stream, user, app, }); }); }); const globalEv = new EventEmitter(); this.redisForSub.on('message', (_: string, data: string) => { const parsed = JSON.parse(data); globalEv.emit('message', parsed); }); this.#wss.on('connection', async (connection: WebSocket.WebSocket, request: http.IncomingMessage, ctx: { stream: MainStreamConnection, user: MiLocalUser | null; app: MiAccessToken | null }) => { const { stream, user, app } = ctx; const ev = new EventEmitter(); function onRedisMessage(data: any): void { ev.emit(data.channel, data.message); } globalEv.on('message', onRedisMessage); await stream.listen(ev, connection); this.#connections.set(connection, Date.now()); const userUpdateIntervalId = user ? setInterval(() => { this.usersService.updateLastActiveDate(user); }, 1000 * 60 * 5) : null; if (user) { this.usersService.updateLastActiveDate(user); } connection.once('close', () => { ev.removeAllListeners(); stream.dispose(); globalEv.off('message', onRedisMessage); this.#connections.delete(connection); if (userUpdateIntervalId) clearInterval(userUpdateIntervalId); }); connection.on('pong', () => { this.#connections.set(connection, Date.now()); }); }); // 一定期間通信が無いコネクションは実際には切断されている可能性があるため定期的にterminateする this.#cleanConnectionsIntervalId = setInterval(() => { const now = Date.now(); for (const [connection, lastActive] of this.#connections.entries()) { if (now - lastActive > 1000 * 60 * 2) { connection.terminate(); this.#connections.delete(connection); } else { connection.ping(); } } }, 1000 * 60); } @bindThis public detach(): Promise { if (this.#cleanConnectionsIntervalId) { clearInterval(this.#cleanConnectionsIntervalId); this.#cleanConnectionsIntervalId = null; } return new Promise((resolve) => { this.#wss.close(() => resolve()); }); } }