diff --git a/packages/backend/src/server/api/stream/channels/channel.ts b/packages/backend/src/server/api/stream/channels/channel.ts index 841c84af8e..5a0f709815 100644 --- a/packages/backend/src/server/api/stream/channels/channel.ts +++ b/packages/backend/src/server/api/stream/channels/channel.ts @@ -10,7 +10,7 @@ export default class extends Channel { public static shouldShare = false; public static requireCredential = false; private channelId: string; - private typers: Record = {}; + private typers: Map = new Map(); private emitTypersIntervalId: ReturnType; constructor(id: string, connection: Channel["connection"]) { @@ -44,8 +44,8 @@ export default class extends Channel { private onEvent(data: StreamMessages["channel"]["payload"]) { if (data.type === "typing") { const id = data.body; - const begin = this.typers[id] == null; - this.typers[id] = new Date(); + const begin = !this.typers.has(id); + this.typers.set(id, new Date()); if (begin) { this.emitTypers(); } @@ -58,10 +58,11 @@ export default class extends Channel { // Remove not typing users for (const [userId, date] of Object.entries(this.typers)) { if (now.getTime() - date.getTime() > 5000) - this.typers[userId] = undefined; + this.typers.delete(userId); } - const users = await Users.packMany(Object.keys(this.typers), null, { + const userIds = Array.from(this.typers.keys()); + const users = await Users.packMany(userIds, null, { detail: false, }); diff --git a/packages/backend/src/server/api/stream/channels/messaging.ts b/packages/backend/src/server/api/stream/channels/messaging.ts index dc18250592..f20b1ead18 100644 --- a/packages/backend/src/server/api/stream/channels/messaging.ts +++ b/packages/backend/src/server/api/stream/channels/messaging.ts @@ -20,7 +20,7 @@ export default class extends Channel { private subCh: | `messagingStream:${User["id"]}-${User["id"]}` | `messagingStream:${UserGroup["id"]}`; - private typers: Record = {}; + private typers: Map = new Map(); private emitTypersIntervalId: ReturnType; constructor(id: string, connection: Channel["connection"]) { @@ -66,8 +66,8 @@ export default class extends Channel { ) { if (data.type === "typing") { const id = data.body; - const begin = this.typers[id] == null; - this.typers[id] = new Date(); + const begin = !this.typers.has(id); + this.typers.set(id, new Date()); if (begin) { this.emitTypers(); } @@ -107,12 +107,13 @@ export default class extends Channel { const now = new Date(); // Remove not typing users - for (const [userId, date] of Object.entries(this.typers)) { + for (const [userId, date] of this.typers.entries()) { if (now.getTime() - date.getTime() > 5000) - this.typers[userId] = undefined; + this.typers.delete(userId); } - const users = await Users.packMany(Object.keys(this.typers), null, { + const userIds = Array.from(this.typers.keys()); + const users = await Users.packMany(userIds, null, { detail: false, }); diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts index 061a17be2e..03cecf5429 100644 --- a/packages/backend/src/server/api/stream/index.ts +++ b/packages/backend/src/server/api/stream/index.ts @@ -42,7 +42,7 @@ export default class Connection { private wsConnection: websocket.connection; public subscriber: StreamEventEmitter; private channels: Channel[] = []; - private subscribingNotes: any = {}; + private subscribingNotes: Map = new Map(); private cachedNotes: Packed<"Note">[] = []; private isMastodonCompatible: boolean = false; private host: string; @@ -339,13 +339,10 @@ export default class Connection { private onSubscribeNote(payload: any) { if (!payload.id) return; - if (this.subscribingNotes[payload.id] == null) { - this.subscribingNotes[payload.id] = 0; - } + const current = this.subscribingNotes.get(payload.id) || 0; + this.subscribingNotes.set(payload.id, current + 1); - this.subscribingNotes[payload.id]++; - - if (this.subscribingNotes[payload.id] === 1) { + if (!current) { this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); } } @@ -356,11 +353,13 @@ export default class Connection { private onUnsubscribeNote(payload: any) { if (!payload.id) return; - this.subscribingNotes[payload.id]--; - if (this.subscribingNotes[payload.id] <= 0) { - this.subscribingNotes[payload.id] = undefined; + const current = this.subscribingNotes.get(payload.id) || 0; + if (current <= 1) { + this.subscribingNotes.delete(payload.id); this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage); + return; } + this.subscribingNotes.set(payload.id, current - 1); } private async onNoteStreamMessage(data: StreamMessages["note"]["payload"]) { diff --git a/packages/backend/src/services/chart/core.ts b/packages/backend/src/services/chart/core.ts index 750a6e0ad8..207d8a93c3 100644 --- a/packages/backend/src/services/chart/core.ts +++ b/packages/backend/src/services/chart/core.ts @@ -17,6 +17,7 @@ import { } from "@/prelude/time.js"; import { getChartInsertLock } from "@/misc/app-lock.js"; import { db } from "@/db/postgre.js"; +import promiseLimit from "promise-limit"; const logger = new Logger("chart", "white", process.env.NODE_ENV !== "test"); @@ -472,7 +473,8 @@ export default abstract class Chart { protected commit(diff: Commit, group: string | null = null): void { for (const [k, v] of Object.entries(diff)) { if (v == null || v === 0 || (Array.isArray(v) && v.length === 0)) - diff[k] = undefined; + // rome-ignore lint/performance/noDelete: needs to be deleted not just set to undefined + delete diff[k]; } this.buffer.push({ diff, @@ -554,7 +556,7 @@ export default abstract class Chart { // bake unique count for (const [k, v] of Object.entries(finalDiffs)) { - if (this.schema[k].uniqueIncrement) { + if (this.schema[k].uniqueIncrement && Array.isArray(v) && v.length > 0) { const name = (columnPrefix + k.replaceAll(".", columnDot)) as keyof Columns; const tempColumnName = (uniqueTempColumnPrefix + @@ -646,16 +648,32 @@ export default abstract class Chart { ); }; - const groups = removeDuplicates(this.buffer.map((log) => log.group)); + const startCount = this.buffer.length; + const groups = removeDuplicates(this.buffer.map((log) => log.group)); + const groupCount = groups.length; + + // Limit the number of concurrent chart update queries executed on the database + // to 25 at a time, so as avoid excessive IO spinlocks like when 8k queries are + // sent out at once. + const limit = promiseLimit(25); + + const startTime = Date.now(); await Promise.all( - groups.map((group) => - Promise.all([ - this.claimCurrentLog(group, "hour"), - this.claimCurrentLog(group, "day"), - ]).then(([logHour, logDay]) => update(logHour, logDay)), + groups.map((group) => + limit(() => + Promise.all([ + this.claimCurrentLog(group, "hour"), + this.claimCurrentLog(group, "day"), + ]).then(([logHour, logDay]) => update(logHour, logDay)), + ), ), ); + + const duration = Date.now() - startTime; + logger.info( + `Saved ${startCount} (${groupCount} unique) ${this.name} items in ${duration}ms (${this.buffer.length} remaining)`, + ); } public async tick(