Merge pull request 'Improve performance' (#9794) from supakaity/hajkey:hk/delete-fixes into develop
Reviewed-on: https://codeberg.org/calckey/calckey/pulls/9794
This commit is contained in:
commit
6ebd25d874
4 changed files with 48 additions and 29 deletions
|
@ -10,7 +10,7 @@ export default class extends Channel {
|
||||||
public static shouldShare = false;
|
public static shouldShare = false;
|
||||||
public static requireCredential = false;
|
public static requireCredential = false;
|
||||||
private channelId: string;
|
private channelId: string;
|
||||||
private typers: Record<User["id"], Date> = {};
|
private typers: Map<User["id"], Date> = new Map();
|
||||||
private emitTypersIntervalId: ReturnType<typeof setInterval>;
|
private emitTypersIntervalId: ReturnType<typeof setInterval>;
|
||||||
|
|
||||||
constructor(id: string, connection: Channel["connection"]) {
|
constructor(id: string, connection: Channel["connection"]) {
|
||||||
|
@ -44,8 +44,8 @@ export default class extends Channel {
|
||||||
private onEvent(data: StreamMessages["channel"]["payload"]) {
|
private onEvent(data: StreamMessages["channel"]["payload"]) {
|
||||||
if (data.type === "typing") {
|
if (data.type === "typing") {
|
||||||
const id = data.body;
|
const id = data.body;
|
||||||
const begin = this.typers[id] == null;
|
const begin = !this.typers.has(id);
|
||||||
this.typers[id] = new Date();
|
this.typers.set(id, new Date());
|
||||||
if (begin) {
|
if (begin) {
|
||||||
this.emitTypers();
|
this.emitTypers();
|
||||||
}
|
}
|
||||||
|
@ -58,10 +58,11 @@ export default class extends Channel {
|
||||||
// Remove not typing users
|
// Remove not typing users
|
||||||
for (const [userId, date] of Object.entries(this.typers)) {
|
for (const [userId, date] of Object.entries(this.typers)) {
|
||||||
if (now.getTime() - date.getTime() > 5000)
|
if (now.getTime() - date.getTime() > 5000)
|
||||||
this.typers[userId] = undefined;
|
this.typers.delete(userId);
|
||||||
}
|
}
|
||||||
|
|
||||||
const users = await Users.packMany(Object.keys(this.typers), null, {
|
const userIds = Array.from(this.typers.keys());
|
||||||
|
const users = await Users.packMany(userIds, null, {
|
||||||
detail: false,
|
detail: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ export default class extends Channel {
|
||||||
private subCh:
|
private subCh:
|
||||||
| `messagingStream:${User["id"]}-${User["id"]}`
|
| `messagingStream:${User["id"]}-${User["id"]}`
|
||||||
| `messagingStream:${UserGroup["id"]}`;
|
| `messagingStream:${UserGroup["id"]}`;
|
||||||
private typers: Record<User["id"], Date> = {};
|
private typers: Map<User["id"], Date> = new Map();
|
||||||
private emitTypersIntervalId: ReturnType<typeof setInterval>;
|
private emitTypersIntervalId: ReturnType<typeof setInterval>;
|
||||||
|
|
||||||
constructor(id: string, connection: Channel["connection"]) {
|
constructor(id: string, connection: Channel["connection"]) {
|
||||||
|
@ -66,8 +66,8 @@ export default class extends Channel {
|
||||||
) {
|
) {
|
||||||
if (data.type === "typing") {
|
if (data.type === "typing") {
|
||||||
const id = data.body;
|
const id = data.body;
|
||||||
const begin = this.typers[id] == null;
|
const begin = !this.typers.has(id);
|
||||||
this.typers[id] = new Date();
|
this.typers.set(id, new Date());
|
||||||
if (begin) {
|
if (begin) {
|
||||||
this.emitTypers();
|
this.emitTypers();
|
||||||
}
|
}
|
||||||
|
@ -107,12 +107,13 @@ export default class extends Channel {
|
||||||
const now = new Date();
|
const now = new Date();
|
||||||
|
|
||||||
// Remove not typing users
|
// Remove not typing users
|
||||||
for (const [userId, date] of Object.entries(this.typers)) {
|
for (const [userId, date] of this.typers.entries()) {
|
||||||
if (now.getTime() - date.getTime() > 5000)
|
if (now.getTime() - date.getTime() > 5000)
|
||||||
this.typers[userId] = undefined;
|
this.typers.delete(userId);
|
||||||
}
|
}
|
||||||
|
|
||||||
const users = await Users.packMany(Object.keys(this.typers), null, {
|
const userIds = Array.from(this.typers.keys());
|
||||||
|
const users = await Users.packMany(userIds, null, {
|
||||||
detail: false,
|
detail: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,7 @@ export default class Connection {
|
||||||
private wsConnection: websocket.connection;
|
private wsConnection: websocket.connection;
|
||||||
public subscriber: StreamEventEmitter;
|
public subscriber: StreamEventEmitter;
|
||||||
private channels: Channel[] = [];
|
private channels: Channel[] = [];
|
||||||
private subscribingNotes: any = {};
|
private subscribingNotes: Map<string, number> = new Map();
|
||||||
private cachedNotes: Packed<"Note">[] = [];
|
private cachedNotes: Packed<"Note">[] = [];
|
||||||
private isMastodonCompatible: boolean = false;
|
private isMastodonCompatible: boolean = false;
|
||||||
private host: string;
|
private host: string;
|
||||||
|
@ -339,13 +339,10 @@ export default class Connection {
|
||||||
private onSubscribeNote(payload: any) {
|
private onSubscribeNote(payload: any) {
|
||||||
if (!payload.id) return;
|
if (!payload.id) return;
|
||||||
|
|
||||||
if (this.subscribingNotes[payload.id] == null) {
|
const current = this.subscribingNotes.get(payload.id) || 0;
|
||||||
this.subscribingNotes[payload.id] = 0;
|
this.subscribingNotes.set(payload.id, current + 1);
|
||||||
}
|
|
||||||
|
|
||||||
this.subscribingNotes[payload.id]++;
|
if (!current) {
|
||||||
|
|
||||||
if (this.subscribingNotes[payload.id] === 1) {
|
|
||||||
this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
|
this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -356,11 +353,13 @@ export default class Connection {
|
||||||
private onUnsubscribeNote(payload: any) {
|
private onUnsubscribeNote(payload: any) {
|
||||||
if (!payload.id) return;
|
if (!payload.id) return;
|
||||||
|
|
||||||
this.subscribingNotes[payload.id]--;
|
const current = this.subscribingNotes.get(payload.id) || 0;
|
||||||
if (this.subscribingNotes[payload.id] <= 0) {
|
if (current <= 1) {
|
||||||
this.subscribingNotes[payload.id] = undefined;
|
this.subscribingNotes.delete(payload.id);
|
||||||
this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage);
|
this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
this.subscribingNotes.set(payload.id, current - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async onNoteStreamMessage(data: StreamMessages["note"]["payload"]) {
|
private async onNoteStreamMessage(data: StreamMessages["note"]["payload"]) {
|
||||||
|
|
|
@ -17,6 +17,7 @@ import {
|
||||||
} from "@/prelude/time.js";
|
} from "@/prelude/time.js";
|
||||||
import { getChartInsertLock } from "@/misc/app-lock.js";
|
import { getChartInsertLock } from "@/misc/app-lock.js";
|
||||||
import { db } from "@/db/postgre.js";
|
import { db } from "@/db/postgre.js";
|
||||||
|
import promiseLimit from "promise-limit";
|
||||||
|
|
||||||
const logger = new Logger("chart", "white", process.env.NODE_ENV !== "test");
|
const logger = new Logger("chart", "white", process.env.NODE_ENV !== "test");
|
||||||
|
|
||||||
|
@ -472,7 +473,8 @@ export default abstract class Chart<T extends Schema> {
|
||||||
protected commit(diff: Commit<T>, group: string | null = null): void {
|
protected commit(diff: Commit<T>, group: string | null = null): void {
|
||||||
for (const [k, v] of Object.entries(diff)) {
|
for (const [k, v] of Object.entries(diff)) {
|
||||||
if (v == null || v === 0 || (Array.isArray(v) && v.length === 0))
|
if (v == null || v === 0 || (Array.isArray(v) && v.length === 0))
|
||||||
diff[k] = undefined;
|
// rome-ignore lint/performance/noDelete: needs to be deleted not just set to undefined
|
||||||
|
delete diff[k];
|
||||||
}
|
}
|
||||||
this.buffer.push({
|
this.buffer.push({
|
||||||
diff,
|
diff,
|
||||||
|
@ -554,7 +556,7 @@ export default abstract class Chart<T extends Schema> {
|
||||||
|
|
||||||
// bake unique count
|
// bake unique count
|
||||||
for (const [k, v] of Object.entries(finalDiffs)) {
|
for (const [k, v] of Object.entries(finalDiffs)) {
|
||||||
if (this.schema[k].uniqueIncrement) {
|
if (this.schema[k].uniqueIncrement && Array.isArray(v) && v.length > 0) {
|
||||||
const name = (columnPrefix +
|
const name = (columnPrefix +
|
||||||
k.replaceAll(".", columnDot)) as keyof Columns<T>;
|
k.replaceAll(".", columnDot)) as keyof Columns<T>;
|
||||||
const tempColumnName = (uniqueTempColumnPrefix +
|
const tempColumnName = (uniqueTempColumnPrefix +
|
||||||
|
@ -646,15 +648,31 @@ export default abstract class Chart<T extends Schema> {
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
const groups = removeDuplicates(this.buffer.map((log) => log.group));
|
const startCount = this.buffer.length;
|
||||||
|
|
||||||
|
const groups = removeDuplicates(this.buffer.map((log) => log.group));
|
||||||
|
const groupCount = groups.length;
|
||||||
|
|
||||||
|
// Limit the number of concurrent chart update queries executed on the database
|
||||||
|
// to 25 at a time, so as avoid excessive IO spinlocks like when 8k queries are
|
||||||
|
// sent out at once.
|
||||||
|
const limit = promiseLimit(25);
|
||||||
|
|
||||||
|
const startTime = Date.now();
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
groups.map((group) =>
|
groups.map((group) =>
|
||||||
|
limit(() =>
|
||||||
Promise.all([
|
Promise.all([
|
||||||
this.claimCurrentLog(group, "hour"),
|
this.claimCurrentLog(group, "hour"),
|
||||||
this.claimCurrentLog(group, "day"),
|
this.claimCurrentLog(group, "day"),
|
||||||
]).then(([logHour, logDay]) => update(logHour, logDay)),
|
]).then(([logHour, logDay]) => update(logHour, logDay)),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
const duration = Date.now() - startTime;
|
||||||
|
logger.info(
|
||||||
|
`Saved ${startCount} (${groupCount} unique) ${this.name} items in ${duration}ms (${this.buffer.length} remaining)`,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue