Improve performance of charts
Fix some undefined !== deleted issues
This commit is contained in:
parent
0e8fe41aaa
commit
b96fe57793
4 changed files with 48 additions and 29 deletions
|
@ -10,7 +10,7 @@ export default class extends Channel {
|
|||
public static shouldShare = false;
|
||||
public static requireCredential = false;
|
||||
private channelId: string;
|
||||
private typers: Record<User["id"], Date> = {};
|
||||
private typers: Map<User["id"], Date> = new Map();
|
||||
private emitTypersIntervalId: ReturnType<typeof setInterval>;
|
||||
|
||||
constructor(id: string, connection: Channel["connection"]) {
|
||||
|
@ -44,8 +44,8 @@ export default class extends Channel {
|
|||
private onEvent(data: StreamMessages["channel"]["payload"]) {
|
||||
if (data.type === "typing") {
|
||||
const id = data.body;
|
||||
const begin = this.typers[id] == null;
|
||||
this.typers[id] = new Date();
|
||||
const begin = !this.typers.has(id);
|
||||
this.typers.set(id, new Date());
|
||||
if (begin) {
|
||||
this.emitTypers();
|
||||
}
|
||||
|
@ -58,10 +58,11 @@ export default class extends Channel {
|
|||
// Remove not typing users
|
||||
for (const [userId, date] of Object.entries(this.typers)) {
|
||||
if (now.getTime() - date.getTime() > 5000)
|
||||
this.typers[userId] = undefined;
|
||||
this.typers.delete(userId);
|
||||
}
|
||||
|
||||
const users = await Users.packMany(Object.keys(this.typers), null, {
|
||||
const keys = Array.from(this.typers.keys());
|
||||
const users = await Users.packMany(keys, null, {
|
||||
detail: false,
|
||||
});
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ export default class extends Channel {
|
|||
private subCh:
|
||||
| `messagingStream:${User["id"]}-${User["id"]}`
|
||||
| `messagingStream:${UserGroup["id"]}`;
|
||||
private typers: Record<User["id"], Date> = {};
|
||||
private typers: Map<User["id"], Date> = new Map();
|
||||
private emitTypersIntervalId: ReturnType<typeof setInterval>;
|
||||
|
||||
constructor(id: string, connection: Channel["connection"]) {
|
||||
|
@ -66,8 +66,8 @@ export default class extends Channel {
|
|||
) {
|
||||
if (data.type === "typing") {
|
||||
const id = data.body;
|
||||
const begin = this.typers[id] == null;
|
||||
this.typers[id] = new Date();
|
||||
const begin = !this.typers.has(id);
|
||||
this.typers.set(id, new Date());
|
||||
if (begin) {
|
||||
this.emitTypers();
|
||||
}
|
||||
|
@ -107,12 +107,13 @@ export default class extends Channel {
|
|||
const now = new Date();
|
||||
|
||||
// Remove not typing users
|
||||
for (const [userId, date] of Object.entries(this.typers)) {
|
||||
for (const [userId, date] of this.typers.entries()) {
|
||||
if (now.getTime() - date.getTime() > 5000)
|
||||
this.typers[userId] = undefined;
|
||||
this.typers.delete(userId);
|
||||
}
|
||||
|
||||
const users = await Users.packMany(Object.keys(this.typers), null, {
|
||||
const ids = Array.from(this.typers.keys());
|
||||
const users = await Users.packMany(ids, null, {
|
||||
detail: false,
|
||||
});
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ export default class Connection {
|
|||
private wsConnection: websocket.connection;
|
||||
public subscriber: StreamEventEmitter;
|
||||
private channels: Channel[] = [];
|
||||
private subscribingNotes: any = {};
|
||||
private subscribingNotes: Map<string, number> = new Map();
|
||||
private cachedNotes: Packed<"Note">[] = [];
|
||||
private isMastodonCompatible: boolean = false;
|
||||
private host: string;
|
||||
|
@ -339,13 +339,10 @@ export default class Connection {
|
|||
private onSubscribeNote(payload: any) {
|
||||
if (!payload.id) return;
|
||||
|
||||
if (this.subscribingNotes[payload.id] == null) {
|
||||
this.subscribingNotes[payload.id] = 0;
|
||||
}
|
||||
const c = this.subscribingNotes.get(payload.id) || 0;
|
||||
this.subscribingNotes.set(payload.id, c + 1);
|
||||
|
||||
this.subscribingNotes[payload.id]++;
|
||||
|
||||
if (this.subscribingNotes[payload.id] === 1) {
|
||||
if (!c) {
|
||||
this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
|
||||
}
|
||||
}
|
||||
|
@ -356,11 +353,13 @@ export default class Connection {
|
|||
private onUnsubscribeNote(payload: any) {
|
||||
if (!payload.id) return;
|
||||
|
||||
this.subscribingNotes[payload.id]--;
|
||||
if (this.subscribingNotes[payload.id] <= 0) {
|
||||
this.subscribingNotes[payload.id] = undefined;
|
||||
const c = this.subscribingNotes.get(payload.id) || 0;
|
||||
if (c <= 1) {
|
||||
this.subscribingNotes.delete(payload.id);
|
||||
this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage);
|
||||
return;
|
||||
}
|
||||
this.subscribingNotes.set(payload.id, c - 1);
|
||||
}
|
||||
|
||||
private async onNoteStreamMessage(data: StreamMessages["note"]["payload"]) {
|
||||
|
|
|
@ -17,6 +17,7 @@ import {
|
|||
} from "@/prelude/time.js";
|
||||
import { getChartInsertLock } from "@/misc/app-lock.js";
|
||||
import { db } from "@/db/postgre.js";
|
||||
import promiseLimit from "promise-limit";
|
||||
|
||||
const logger = new Logger("chart", "white", process.env.NODE_ENV !== "test");
|
||||
|
||||
|
@ -472,7 +473,8 @@ export default abstract class Chart<T extends Schema> {
|
|||
protected commit(diff: Commit<T>, group: string | null = null): void {
|
||||
for (const [k, v] of Object.entries(diff)) {
|
||||
if (v == null || v === 0 || (Array.isArray(v) && v.length === 0))
|
||||
diff[k] = undefined;
|
||||
// rome-ignore lint/performance/noDelete: needs to be deleted not just set to undefined
|
||||
delete diff[k];
|
||||
}
|
||||
this.buffer.push({
|
||||
diff,
|
||||
|
@ -554,7 +556,7 @@ export default abstract class Chart<T extends Schema> {
|
|||
|
||||
// bake unique count
|
||||
for (const [k, v] of Object.entries(finalDiffs)) {
|
||||
if (this.schema[k].uniqueIncrement) {
|
||||
if (this.schema[k].uniqueIncrement && Array.isArray(v) && v.length > 0) {
|
||||
const name = (columnPrefix +
|
||||
k.replaceAll(".", columnDot)) as keyof Columns<T>;
|
||||
const tempColumnName = (uniqueTempColumnPrefix +
|
||||
|
@ -646,16 +648,32 @@ export default abstract class Chart<T extends Schema> {
|
|||
);
|
||||
};
|
||||
|
||||
const groups = removeDuplicates(this.buffer.map((log) => log.group));
|
||||
const startCount = this.buffer.length;
|
||||
|
||||
const groups = removeDuplicates(this.buffer.map((log) => log.group));
|
||||
const groupCount = groups.length;
|
||||
|
||||
// Limit the number of concurrent chart update queries executed on the database
|
||||
// to 25 at a time, so as avoid excessive IO spinlocks like when 8k queries are
|
||||
// sent out at once.
|
||||
const limit = promiseLimit(25);
|
||||
|
||||
const startTime = Date.now();
|
||||
await Promise.all(
|
||||
groups.map((group) =>
|
||||
Promise.all([
|
||||
this.claimCurrentLog(group, "hour"),
|
||||
this.claimCurrentLog(group, "day"),
|
||||
]).then(([logHour, logDay]) => update(logHour, logDay)),
|
||||
groups.map((group) =>
|
||||
limit(() =>
|
||||
Promise.all([
|
||||
this.claimCurrentLog(group, "hour"),
|
||||
this.claimCurrentLog(group, "day"),
|
||||
]).then(([logHour, logDay]) => update(logHour, logDay)),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
const duration = Date.now() - startTime;
|
||||
logger.info(
|
||||
`Saved ${startCount} (${groupCount} unique) ${this.name} items in ${duration}ms (${this.buffer.length} remaining)`,
|
||||
);
|
||||
}
|
||||
|
||||
public async tick(
|
||||
|
|
Loading…
Reference in a new issue