perf: query scylladb in reactions endpoint
This commit is contained in:
parent
63c445554a
commit
924a8f9904
6 changed files with 93 additions and 68 deletions
|
@ -1,3 +1,4 @@
|
||||||
|
DROP MATERIALIZED VIEW IF EXISTS reaction_by_id;
|
||||||
DROP MATERIALIZED VIEW IF EXISTS reaction_by_userid;
|
DROP MATERIALIZED VIEW IF EXISTS reaction_by_userid;
|
||||||
DROP INDEX IF EXISTS reaction_by_id;
|
DROP INDEX IF EXISTS reaction_by_id;
|
||||||
DROP TABLE IF EXISTS reaction;
|
DROP TABLE IF EXISTS reaction;
|
||||||
|
|
|
@ -99,11 +99,9 @@ CREATE TABLE IF NOT EXISTS reaction (
|
||||||
"reaction" text,
|
"reaction" text,
|
||||||
"emoji" frozen<emoji>,
|
"emoji" frozen<emoji>,
|
||||||
"createdAt" timestamp,
|
"createdAt" timestamp,
|
||||||
PRIMARY KEY ("noteId", "userId")
|
PRIMARY KEY ("noteId", "userId") -- this key constraints one reaction per user for the same post
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS reaction_by_id ON reaction ("id");
|
|
||||||
|
|
||||||
CREATE MATERIALIZED VIEW IF NOT EXISTS reaction_by_userid AS
|
CREATE MATERIALIZED VIEW IF NOT EXISTS reaction_by_userid AS
|
||||||
SELECT * FROM reaction
|
SELECT * FROM reaction
|
||||||
WHERE "userId" IS NOT NULL
|
WHERE "userId" IS NOT NULL
|
||||||
|
@ -111,3 +109,10 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS reaction_by_userid AS
|
||||||
AND "noteId" IS NOT NULL
|
AND "noteId" IS NOT NULL
|
||||||
PRIMARY KEY ("userId", "createdAt", "noteId")
|
PRIMARY KEY ("userId", "createdAt", "noteId")
|
||||||
WITH CLUSTERING ORDER BY ("createdAt" DESC);
|
WITH CLUSTERING ORDER BY ("createdAt" DESC);
|
||||||
|
|
||||||
|
CREATE MATERIALIZED VIEW IF NOT EXISTS reaction_by_id AS
|
||||||
|
SELECT * FROM reaction
|
||||||
|
WHERE "noteId" IS NOT NULL
|
||||||
|
AND "reaction" IS NOT NULL
|
||||||
|
AND "userId" IS NOT NULL
|
||||||
|
PRIMARY KEY ("noteId", "reaction", "userId");
|
||||||
|
|
|
@ -135,7 +135,7 @@ export const prepared = {
|
||||||
("id", "noteId", "userId", "reaction", "emoji", "createdAt")
|
("id", "noteId", "userId", "reaction", "emoji", "createdAt")
|
||||||
VALUES (?, ?, ?, ?, ?, ?)`,
|
VALUES (?, ?, ?, ?, ?, ?)`,
|
||||||
select: {
|
select: {
|
||||||
byNoteId: `SELECT * FROM reaction WHERE "noteId" IN ?`,
|
byNoteId: `SELECT * FROM reaction_by_id WHERE "noteId" IN ?`,
|
||||||
byUserId: `SELECT * FROM reaction_by_userid WHERE "userId" IN ?`,
|
byUserId: `SELECT * FROM reaction_by_userid WHERE "userId" IN ?`,
|
||||||
byNoteAndUser: `SELECT * FROM reaction WHERE "noteId" IN ? AND "userId" IN ?`,
|
byNoteAndUser: `SELECT * FROM reaction WHERE "noteId" IN ? AND "userId" IN ?`,
|
||||||
byId: `SELECT * FROM reaction WHERE "id" IN ?`,
|
byId: `SELECT * FROM reaction WHERE "id" IN ?`,
|
||||||
|
@ -274,7 +274,7 @@ export function prepareTimelineQuery(ps: {
|
||||||
queryParts.push(`AND "createdAt" > ?`);
|
queryParts.push(`AND "createdAt" > ?`);
|
||||||
}
|
}
|
||||||
|
|
||||||
queryParts.push("LIMIT 50"); // Hardcoded to issue a prepared query
|
queryParts.push("LIMIT ?");
|
||||||
const query = queryParts.join(" ");
|
const query = queryParts.join(" ");
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
@ -304,10 +304,11 @@ export async function execTimelineQuery(
|
||||||
|
|
||||||
// Try to get posts of at most <maxDays> in the single request
|
// Try to get posts of at most <maxDays> in the single request
|
||||||
while (foundNotes.length < ps.limit && scannedEmptyPartitions < maxDays) {
|
while (foundNotes.length < ps.limit && scannedEmptyPartitions < maxDays) {
|
||||||
const params: (Date | string | string[])[] = [untilDate, untilDate];
|
const params: (Date | string | string[] | number)[] = [untilDate, untilDate];
|
||||||
if (sinceDate) {
|
if (sinceDate) {
|
||||||
params.push(sinceDate);
|
params.push(sinceDate);
|
||||||
}
|
}
|
||||||
|
params.push(ps.limit)
|
||||||
|
|
||||||
const result = await scyllaClient.execute(query, params, {
|
const result = await scyllaClient.execute(query, params, {
|
||||||
prepare: true,
|
prepare: true,
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
import { db } from "@/db/postgre.js";
|
import { db } from "@/db/postgre.js";
|
||||||
import { Meta } from "@/models/entities/meta.js";
|
import { Meta } from "@/models/entities/meta.js";
|
||||||
|
import { Cache } from "@/misc/cache.js";
|
||||||
|
|
||||||
let cache: Meta;
|
export const metaCache = new Cache<Meta>("meta", 10);
|
||||||
|
|
||||||
export function metaToPugArgs(meta: Meta): object {
|
export function metaToPugArgs(meta: Meta): object {
|
||||||
let motd = ["Loading..."];
|
let motd = ["Loading..."];
|
||||||
|
@ -30,43 +31,38 @@ export function metaToPugArgs(meta: Meta): object {
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function fetchMeta(noCache = false): Promise<Meta> {
|
export async function fetchMeta(noCache = false): Promise<Meta> {
|
||||||
if (!noCache && cache) return cache;
|
const fetcher = () =>
|
||||||
|
db.transaction(async (transactionalEntityManager) => {
|
||||||
|
// New IDs are prioritized because multiple records may have been created due to past bugs.
|
||||||
|
const metas = await transactionalEntityManager.find(Meta, {
|
||||||
|
order: {
|
||||||
|
id: "DESC",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
return await db.transaction(async (transactionalEntityManager) => {
|
if (metas.length > 0) {
|
||||||
// New IDs are prioritized because multiple records may have been created due to past bugs.
|
return metas[0];
|
||||||
const metas = await transactionalEntityManager.find(Meta, {
|
} else {
|
||||||
order: {
|
// If fetchMeta is called at the same time when meta is empty, this part may be called at the same time, so use fail-safe upsert.
|
||||||
id: "DESC",
|
const saved = await transactionalEntityManager
|
||||||
},
|
.upsert(
|
||||||
|
Meta,
|
||||||
|
{
|
||||||
|
id: "x",
|
||||||
|
},
|
||||||
|
["id"],
|
||||||
|
)
|
||||||
|
.then((x) =>
|
||||||
|
transactionalEntityManager.findOneByOrFail(Meta, x.identifiers[0]),
|
||||||
|
);
|
||||||
|
|
||||||
|
return saved;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
const meta = metas[0];
|
if (noCache) {
|
||||||
|
return await fetcher();
|
||||||
|
}
|
||||||
|
|
||||||
if (meta) {
|
return await metaCache.fetch(null, fetcher);
|
||||||
cache = meta;
|
|
||||||
return meta;
|
|
||||||
} else {
|
|
||||||
// If fetchMeta is called at the same time when meta is empty, this part may be called at the same time, so use fail-safe upsert.
|
|
||||||
const saved = await transactionalEntityManager
|
|
||||||
.upsert(
|
|
||||||
Meta,
|
|
||||||
{
|
|
||||||
id: "x",
|
|
||||||
},
|
|
||||||
["id"],
|
|
||||||
)
|
|
||||||
.then((x) =>
|
|
||||||
transactionalEntityManager.findOneByOrFail(Meta, x.identifiers[0]),
|
|
||||||
);
|
|
||||||
|
|
||||||
cache = saved;
|
|
||||||
return saved;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
setInterval(() => {
|
|
||||||
fetchMeta(true).then((meta) => {
|
|
||||||
cache = meta;
|
|
||||||
});
|
|
||||||
}, 1000 * 10);
|
|
||||||
|
|
|
@ -2,6 +2,8 @@ import { Meta } from "@/models/entities/meta.js";
|
||||||
import { insertModerationLog } from "@/services/insert-moderation-log.js";
|
import { insertModerationLog } from "@/services/insert-moderation-log.js";
|
||||||
import { db } from "@/db/postgre.js";
|
import { db } from "@/db/postgre.js";
|
||||||
import define from "../../define.js";
|
import define from "../../define.js";
|
||||||
|
import { Metas } from "@/models/index.js";
|
||||||
|
import { metaCache } from "@/misc/fetch-meta.js";
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
tags: ["admin"],
|
tags: ["admin"],
|
||||||
|
@ -594,21 +596,17 @@ export default define(meta, paramDef, async (ps, me) => {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await db.transaction(async (transactionalEntityManager) => {
|
const metas = await Metas.find({ order: { id: "DESC" }});
|
||||||
const metas = await transactionalEntityManager.find(Meta, {
|
let newMeta: Meta;
|
||||||
order: {
|
if (metas.length > 0) {
|
||||||
id: "DESC",
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
const meta = metas[0];
|
const meta = metas[0];
|
||||||
|
await Metas.update(meta.id, set);
|
||||||
if (meta) {
|
newMeta = {...meta, ...set};
|
||||||
await transactionalEntityManager.update(Meta, meta.id, set);
|
} else {
|
||||||
} else {
|
await Metas.save(set);
|
||||||
await transactionalEntityManager.save(Meta, set);
|
newMeta = set as Meta;
|
||||||
}
|
}
|
||||||
});
|
await metaCache.set(null, newMeta);
|
||||||
|
|
||||||
insertModerationLog(me, "updateMeta");
|
insertModerationLog(me, "updateMeta");
|
||||||
});
|
});
|
||||||
|
|
|
@ -4,6 +4,7 @@ import type { NoteReaction } from "@/models/entities/note-reaction.js";
|
||||||
import define from "../../define.js";
|
import define from "../../define.js";
|
||||||
import { ApiError } from "../../error.js";
|
import { ApiError } from "../../error.js";
|
||||||
import { getNote } from "../../common/getters.js";
|
import { getNote } from "../../common/getters.js";
|
||||||
|
import { parseScyllaReaction, prepared, scyllaClient } from "@/db/scylla.js";
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
tags: ["notes", "reactions"],
|
tags: ["notes", "reactions"],
|
||||||
|
@ -50,7 +51,7 @@ export const paramDef = {
|
||||||
|
|
||||||
export default define(meta, paramDef, async (ps, user) => {
|
export default define(meta, paramDef, async (ps, user) => {
|
||||||
// check note visibility
|
// check note visibility
|
||||||
const note = await getNote(ps.noteId, user).catch((err) => {
|
await getNote(ps.noteId, user).catch((err) => {
|
||||||
if (err.id === "9725d0ce-ba28-4dde-95a7-2cbb2c15de24")
|
if (err.id === "9725d0ce-ba28-4dde-95a7-2cbb2c15de24")
|
||||||
throw new ApiError(meta.errors.noSuchNote);
|
throw new ApiError(meta.errors.noSuchNote);
|
||||||
throw err;
|
throw err;
|
||||||
|
@ -61,8 +62,7 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
} as FindOptionsWhere<NoteReaction>;
|
} as FindOptionsWhere<NoteReaction>;
|
||||||
|
|
||||||
if (ps.type) {
|
if (ps.type) {
|
||||||
// ローカルリアクションはホスト名が . とされているが
|
// Remove "." suffix of local emojis here because they are actually null in DB.
|
||||||
// DB 上ではそうではないので、必要に応じて変換
|
|
||||||
const suffix = "@.:";
|
const suffix = "@.:";
|
||||||
const type = ps.type.endsWith(suffix)
|
const type = ps.type.endsWith(suffix)
|
||||||
? `${ps.type.slice(0, ps.type.length - suffix.length)}:`
|
? `${ps.type.slice(0, ps.type.length - suffix.length)}:`
|
||||||
|
@ -70,15 +70,39 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
query.reaction = type;
|
query.reaction = type;
|
||||||
}
|
}
|
||||||
|
|
||||||
const reactions = await NoteReactions.find({
|
let reactions: NoteReaction[] = [];
|
||||||
where: query,
|
if (scyllaClient) {
|
||||||
take: ps.limit,
|
const scyllaQuery = [prepared.reaction.select.byNoteId]
|
||||||
skip: ps.offset,
|
const params: (string | string[] | number)[] = [[ps.noteId]];
|
||||||
order: {
|
if (ps.type) {
|
||||||
id: -1,
|
scyllaQuery.push(`AND "reaction" = ?`);
|
||||||
},
|
params.push(query.reaction as string)
|
||||||
relations: ["user", "user.avatar", "user.banner", "note"],
|
}
|
||||||
});
|
scyllaQuery.push("LIMIT ?");
|
||||||
|
params.push(ps.limit);
|
||||||
|
|
||||||
|
// Note: This query fails if the number of returned rows exceeds 5000 by
|
||||||
|
// default, i.e., 5000 reactions with the same emoji from different users.
|
||||||
|
// This limitation can be relaxed via "fetchSize" option.
|
||||||
|
// Note: Remote emojis and local emojis are different.
|
||||||
|
// Ref: https://github.com/datastax/nodejs-driver#paging
|
||||||
|
const result = await scyllaClient.execute(
|
||||||
|
scyllaQuery.join(" "),
|
||||||
|
params,
|
||||||
|
{ prepare: true },
|
||||||
|
);
|
||||||
|
reactions = result.rows.map(parseScyllaReaction);
|
||||||
|
} else {
|
||||||
|
reactions = await NoteReactions.find({
|
||||||
|
where: query,
|
||||||
|
take: ps.limit,
|
||||||
|
skip: ps.offset,
|
||||||
|
order: {
|
||||||
|
id: -1,
|
||||||
|
},
|
||||||
|
relations: ["user", "user.avatar", "user.banner", "note"],
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
return await NoteReactions.packMany(reactions, user);
|
return await NoteReactions.packMany(reactions, user);
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in a new issue