perf: build timelines on scylla
This commit is contained in:
parent
43fb3375be
commit
dbea0ee6fd
14 changed files with 365 additions and 193 deletions
|
@ -2,8 +2,9 @@ DROP MATERIALIZED VIEW IF EXISTS reaction_by_id;
|
|||
DROP MATERIALIZED VIEW IF EXISTS reaction_by_userid;
|
||||
DROP INDEX IF EXISTS reaction_by_id;
|
||||
DROP TABLE IF EXISTS reaction;
|
||||
DROP TABLE IF EXISTS local_feed;
|
||||
DROP TABLE IF EXISTS home_feed;
|
||||
DROP TABLE IF EXISTS home_timeline;
|
||||
DROP MATERIALIZED VIEW IF EXISTS local_timeline;
|
||||
DROP MATERIALIZED VIEW IF EXISTS global_timeline;
|
||||
DROP MATERIALIZED VIEW IF EXISTS note_by_renote_id;
|
||||
DROP MATERIALIZED VIEW IF EXISTS note_by_userid;
|
||||
DROP MATERIALIZED VIEW IF EXISTS note_by_id;
|
||||
|
|
|
@ -29,7 +29,7 @@ CREATE TYPE IF NOT EXISTS emoji (
|
|||
"height" int,
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS note ( -- Models timeline
|
||||
CREATE TABLE IF NOT EXISTS note ( -- Store all posts
|
||||
"createdAtDate" date, -- For partitioning
|
||||
"createdAt" timestamp,
|
||||
"id" ascii, -- Post
|
||||
|
@ -69,7 +69,7 @@ CREATE TABLE IF NOT EXISTS note ( -- Models timeline
|
|||
"reactions" map<text, int>, -- Reactions
|
||||
"noteEdit" set<frozen<note_edit_history>>, -- Edit History
|
||||
"updatedAt" timestamp,
|
||||
PRIMARY KEY ("createdAtDate", "createdAt", "userId")
|
||||
PRIMARY KEY ("createdAtDate", "createdAt", "userId", "userHost", "visibility")
|
||||
) WITH CLUSTERING ORDER BY ("createdAt" DESC);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS note_by_uri ON note ("uri");
|
||||
|
@ -81,7 +81,9 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_id AS
|
|||
AND "createdAt" IS NOT NULL
|
||||
AND "createdAtDate" IS NOT NULL
|
||||
AND "userId" IS NOT NULL
|
||||
PRIMARY KEY ("id", "createdAt", "createdAtDate", "userId")
|
||||
AND "userHost" IS NOT NULL
|
||||
AND "visibility" IS NOT NULL
|
||||
PRIMARY KEY ("id", "createdAt", "createdAtDate", "userId", "userHost", "visibility")
|
||||
WITH CLUSTERING ORDER BY ("createdAt" DESC);
|
||||
|
||||
CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_user_id AS
|
||||
|
@ -89,7 +91,9 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_user_id AS
|
|||
WHERE "userId" IS NOT NULL
|
||||
AND "createdAt" IS NOT NULL
|
||||
AND "createdAtDate" IS NOT NULL
|
||||
PRIMARY KEY ("userId", "createdAt", "createdAtDate")
|
||||
AND "userHost" IS NOT NULL
|
||||
AND "visibility" IS NOT NULL
|
||||
PRIMARY KEY ("userId", "createdAt", "createdAtDate", "userHost", "visibility")
|
||||
WITH CLUSTERING ORDER BY ("createdAt" DESC);
|
||||
|
||||
CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_renote_id AS
|
||||
|
@ -98,35 +102,72 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS note_by_renote_id AS
|
|||
AND "createdAt" IS NOT NULL
|
||||
AND "createdAtDate" IS NOT NULL
|
||||
AND "userId" IS NOT NULL
|
||||
PRIMARY KEY ("renoteId", "createdAt", "createdAtDate", "userId")
|
||||
AND "userHost" IS NOT NULL
|
||||
AND "visibility" IS NOT NULL
|
||||
PRIMARY KEY ("renoteId", "createdAt", "createdAtDate", "userId", "userHost", "visibility")
|
||||
WITH CLUSTERING ORDER BY ("createdAt" DESC);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS home_feed (
|
||||
"userId" ascii,
|
||||
"fedAtDate" date,
|
||||
"fedAt" timestamp,
|
||||
"noteId" ascii,
|
||||
"noteUserId" ascii,
|
||||
"noteUserHost" text,
|
||||
"replyUserId" ascii,
|
||||
"replyUserHost" text,
|
||||
"renoteUserId" ascii,
|
||||
"renoteUserHost" text,
|
||||
PRIMARY KEY (("userId", "fedAtDate"), "fedAt")
|
||||
) WITH CLUSTERING ORDER BY ("fedAt" DESC);
|
||||
CREATE MATERIALIZED VIEW IF NOT EXISTS global_timeline AS
|
||||
SELECT * FROM note
|
||||
WHERE "createdAtDate" IS NOT NULL
|
||||
AND "createdAt" IS NOT NULL
|
||||
AND "userId" IS NOT NULL
|
||||
AND "userHost" IS NOT NULL
|
||||
AND "visibility" = "public"
|
||||
PRIMARY KEY ("createdAtDate", "createdAt", "userId", "userHost", "visibility");
|
||||
|
||||
CREATE TABLE IF NOT EXISTS local_feed (
|
||||
"fedAtDate" date,
|
||||
"fedAt" timestamp,
|
||||
"noteId" ascii,
|
||||
"noteUserId" ascii,
|
||||
"noteUserHost" text,
|
||||
CREATE MATERIALIZED VIEW IF NOT EXISTS local_timeline AS
|
||||
SELECT * FROM note
|
||||
WHERE "createdAtDate" IS NOT NULL
|
||||
AND "createdAt" IS NOT NULL
|
||||
AND "userId" IS NOT NULL
|
||||
AND "userHost" = "local"
|
||||
AND "visibility" = "public"
|
||||
PRIMARY KEY ("createdAtDate", "createdAt", "userId", "userHost", "visibility");
|
||||
|
||||
CREATE TABLE IF NOT EXISTS home_timeline (
|
||||
"feedUserId" ascii, -- For partitioning
|
||||
"createdAtDate" date, -- For partitioning
|
||||
"createdAt" timestamp,
|
||||
"id" ascii, -- Post
|
||||
"visibility" ascii,
|
||||
"content" text,
|
||||
"name" text,
|
||||
"cw" text,
|
||||
"localOnly" boolean,
|
||||
"renoteCount" int,
|
||||
"repliesCount" int,
|
||||
"uri" text,
|
||||
"url" text,
|
||||
"score" int,
|
||||
"files" set<frozen<drive_file>>,
|
||||
"visibleUserIds" set<ascii>,
|
||||
"mentions" set<ascii>,
|
||||
"mentionedRemoteUsers" text,
|
||||
"emojis" set<text>,
|
||||
"tags" set<text>,
|
||||
"hasPoll" boolean,
|
||||
"threadId" ascii,
|
||||
"channelId" ascii, -- Channel
|
||||
"userId" ascii, -- User
|
||||
"userHost" text,
|
||||
"replyId" ascii, -- Reply
|
||||
"replyUserId" ascii,
|
||||
"replyUserHost" text,
|
||||
"replyContent" text,
|
||||
"replyCw" text,
|
||||
"replyFiles" set<frozen<drive_file>>,
|
||||
"renoteId" ascii, -- Boost
|
||||
"renoteUserId" ascii,
|
||||
"renoteUserHost" text,
|
||||
PRIMARY KEY ("fedAtDate", "fedAt")
|
||||
) WITH CLUSTERING ORDER BY ("fedAt" DESC);
|
||||
"renoteContent" text,
|
||||
"renoteCw" text,
|
||||
"renoteFiles" set<frozen<drive_file>>,
|
||||
"reactions" map<text, int>, -- Reactions
|
||||
"noteEdit" set<frozen<note_edit_history>>, -- Edit History
|
||||
"updatedAt" timestamp,
|
||||
PRIMARY KEY (("feedUserId", "createdAtDate"), "createdAt")
|
||||
) WITH CLUSTERING ORDER BY ("createdAt" DESC);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS reaction (
|
||||
"id" text,
|
||||
|
|
137
packages/backend/src/db/cql.ts
Normal file
137
packages/backend/src/db/cql.ts
Normal file
|
@ -0,0 +1,137 @@
|
|||
export const scyllaQueries = {
|
||||
note: {
|
||||
insert: `INSERT INTO note (
|
||||
"createdAtDate",
|
||||
"createdAt",
|
||||
"id",
|
||||
"visibility",
|
||||
"content",
|
||||
"name",
|
||||
"cw",
|
||||
"localOnly",
|
||||
"renoteCount",
|
||||
"repliesCount",
|
||||
"uri",
|
||||
"url",
|
||||
"score",
|
||||
"files",
|
||||
"visibleUserIds",
|
||||
"mentions",
|
||||
"mentionedRemoteUsers",
|
||||
"emojis",
|
||||
"tags",
|
||||
"hasPoll",
|
||||
"threadId",
|
||||
"channelId",
|
||||
"userId",
|
||||
"userHost",
|
||||
"replyId",
|
||||
"replyUserId",
|
||||
"replyUserHost",
|
||||
"replyContent",
|
||||
"replyCw",
|
||||
"replyFiles",
|
||||
"renoteId",
|
||||
"renoteUserId",
|
||||
"renoteUserHost",
|
||||
"renoteContent",
|
||||
"renoteCw",
|
||||
"renoteFiles",
|
||||
"reactions",
|
||||
"noteEdit",
|
||||
"updatedAt"
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
select: {
|
||||
byDate: `SELECT * FROM note WHERE "createdAtDate" = ?`,
|
||||
byUri: `SELECT * FROM note WHERE "uri" = ?`,
|
||||
byUrl: `SELECT * FROM note WHERE "url" = ?`,
|
||||
byId: `SELECT * FROM note_by_id WHERE "id" IN ?`,
|
||||
byUserId: `SELECT * FROM note_by_user_id WHERE "userId" IN ?`,
|
||||
byRenoteId: `SELECT * FROM note_by_renote_id WHERE "renoteId" = ?`,
|
||||
},
|
||||
delete: `DELETE FROM note WHERE "createdAtDate" = ? AND "createdAt" = ? AND "userId" = ?`,
|
||||
update: {
|
||||
renoteCount: `UPDATE note SET
|
||||
"renoteCount" = ?,
|
||||
"score" = ?
|
||||
WHERE "createdAtDate" = ? AND "createdAt" = ? AND "userId" = ? IF EXISTS`,
|
||||
repliesCount: `UPDATE note SET
|
||||
"repliesCount" = ?
|
||||
WHERE "createdAtDate" = ? AND "createdAt" = ? AND "userId" = ? IF EXISTS`,
|
||||
reactions: `UPDATE note SET
|
||||
"emojis" = ?,
|
||||
"reactions" = ?,
|
||||
"score" = ?
|
||||
WHERE "createdAtDate" = ? AND "createdAt" = ? AND "userId" = ? IF EXISTS`,
|
||||
},
|
||||
},
|
||||
homeTimeline: {
|
||||
insert: `INSERT INTO home_timeline (
|
||||
"feedUserId",
|
||||
"createdAtDate",
|
||||
"createdAt",
|
||||
"id",
|
||||
"visibility",
|
||||
"content",
|
||||
"name",
|
||||
"cw",
|
||||
"localOnly",
|
||||
"renoteCount",
|
||||
"repliesCount",
|
||||
"uri",
|
||||
"url",
|
||||
"score",
|
||||
"files",
|
||||
"visibleUserIds",
|
||||
"mentions",
|
||||
"mentionedRemoteUsers",
|
||||
"emojis",
|
||||
"tags",
|
||||
"hasPoll",
|
||||
"threadId",
|
||||
"channelId",
|
||||
"userId",
|
||||
"userHost",
|
||||
"replyId",
|
||||
"replyUserId",
|
||||
"replyUserHost",
|
||||
"replyContent",
|
||||
"replyCw",
|
||||
"replyFiles",
|
||||
"renoteId",
|
||||
"renoteUserId",
|
||||
"renoteUserHost",
|
||||
"renoteContent",
|
||||
"renoteCw",
|
||||
"renoteFiles",
|
||||
"reactions",
|
||||
"noteEdit",
|
||||
"updatedAt"
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
select: {
|
||||
byUserAndDate: `SELECT * FROM home_timeline WHERE "feedUserId" = ? AND "createdAtDate" = ?`,
|
||||
},
|
||||
},
|
||||
localTimeline: {
|
||||
select: {
|
||||
byDate: `SELECT * FROM local_timeline WHERE "createdAtDate" = ?`,
|
||||
},
|
||||
},
|
||||
globalTimeline: {
|
||||
select: {
|
||||
byDate: `SELECT * FROM global_timeline WHERE "createdAtDate" = ?`,
|
||||
},
|
||||
},
|
||||
reaction: {
|
||||
insert: `INSERT INTO reaction
|
||||
("id", "noteId", "userId", "reaction", "emoji", "createdAt")
|
||||
VALUES (?, ?, ?, ?, ?, ?)`,
|
||||
select: {
|
||||
byNoteId: `SELECT * FROM reaction_by_id WHERE "noteId" IN ?`,
|
||||
byUserId: `SELECT * FROM reaction_by_user_id WHERE "userId" IN ?`,
|
||||
byNoteAndUser: `SELECT * FROM reaction WHERE "noteId" IN ? AND "userId" IN ?`,
|
||||
byId: `SELECT * FROM reaction WHERE "id" IN ?`,
|
||||
},
|
||||
delete: `DELETE FROM reaction WHERE "noteId" = ? AND "userId" = ?`,
|
||||
},
|
||||
};
|
|
@ -19,6 +19,7 @@ import Logger from "@/services/logger.js";
|
|||
import { UserProfiles } from "@/models/index.js";
|
||||
import { getWordHardMute } from "@/misc/check-word-mute.js";
|
||||
import type { UserProfile } from "@/models/entities/user-profile.js";
|
||||
import { scyllaQueries } from "./cql";
|
||||
|
||||
function newClient(): Client | null {
|
||||
if (!config.scylla) {
|
||||
|
@ -62,88 +63,7 @@ function newClient(): Client | null {
|
|||
|
||||
export const scyllaClient = newClient();
|
||||
|
||||
export const prepared = {
|
||||
note: {
|
||||
insert: `INSERT INTO note (
|
||||
"createdAtDate",
|
||||
"createdAt",
|
||||
"id",
|
||||
"visibility",
|
||||
"content",
|
||||
"name",
|
||||
"cw",
|
||||
"localOnly",
|
||||
"renoteCount",
|
||||
"repliesCount",
|
||||
"uri",
|
||||
"url",
|
||||
"score",
|
||||
"files",
|
||||
"visibleUserIds",
|
||||
"mentions",
|
||||
"mentionedRemoteUsers",
|
||||
"emojis",
|
||||
"tags",
|
||||
"hasPoll",
|
||||
"threadId",
|
||||
"channelId",
|
||||
"userId",
|
||||
"userHost",
|
||||
"replyId",
|
||||
"replyUserId",
|
||||
"replyUserHost",
|
||||
"replyContent",
|
||||
"replyCw",
|
||||
"replyFiles",
|
||||
"renoteId",
|
||||
"renoteUserId",
|
||||
"renoteUserHost",
|
||||
"renoteContent",
|
||||
"renoteCw",
|
||||
"renoteFiles",
|
||||
"reactions",
|
||||
"noteEdit",
|
||||
"updatedAt"
|
||||
)
|
||||
VALUES
|
||||
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
select: {
|
||||
byDate: `SELECT * FROM note WHERE "createdAtDate" = ?`,
|
||||
byUri: `SELECT * FROM note WHERE "uri" = ?`,
|
||||
byUrl: `SELECT * FROM note WHERE "url" = ?`,
|
||||
byId: `SELECT * FROM note_by_id WHERE "id" IN ?`,
|
||||
byUserId: `SELECT * FROM note_by_user_id WHERE "userId" IN ?`,
|
||||
byRenoteId: `SELECT * FROM note_by_renote_id WHERE "renoteId" = ?`,
|
||||
},
|
||||
delete: `DELETE FROM note WHERE "createdAtDate" = ? AND "createdAt" = ? AND "userId" = ?`,
|
||||
update: {
|
||||
renoteCount: `UPDATE note SET
|
||||
"renoteCount" = ?,
|
||||
"score" = ?
|
||||
WHERE "createdAtDate" = ? AND "createdAt" = ? AND "userId" = ? IF EXISTS`,
|
||||
repliesCount: `UPDATE note SET
|
||||
"repliesCount" = ?
|
||||
WHERE "createdAtDate" = ? AND "createdAt" = ? AND "userId" = ? IF EXISTS`,
|
||||
reactions: `UPDATE note SET
|
||||
"emojis" = ?,
|
||||
"reactions" = ?,
|
||||
"score" = ?
|
||||
WHERE "createdAtDate" = ? AND "createdAt" = ? AND "userId" = ? IF EXISTS`,
|
||||
},
|
||||
},
|
||||
reaction: {
|
||||
insert: `INSERT INTO reaction
|
||||
("id", "noteId", "userId", "reaction", "emoji", "createdAt")
|
||||
VALUES (?, ?, ?, ?, ?, ?)`,
|
||||
select: {
|
||||
byNoteId: `SELECT * FROM reaction_by_id WHERE "noteId" IN ?`,
|
||||
byUserId: `SELECT * FROM reaction_by_user_id WHERE "userId" IN ?`,
|
||||
byNoteAndUser: `SELECT * FROM reaction WHERE "noteId" IN ? AND "userId" IN ?`,
|
||||
byId: `SELECT * FROM reaction WHERE "id" IN ?`,
|
||||
},
|
||||
delete: `DELETE FROM reaction WHERE "noteId" = ? AND "userId" = ?`,
|
||||
},
|
||||
};
|
||||
export const prepared = scyllaQueries;
|
||||
|
||||
export interface ScyllaDriveFile {
|
||||
id: string;
|
||||
|
@ -183,6 +103,7 @@ export type ScyllaNote = Note & {
|
|||
|
||||
export function parseScyllaNote(row: types.Row): ScyllaNote {
|
||||
const files: ScyllaDriveFile[] = row.get("files") ?? [];
|
||||
const userHost = row.get("userHost");
|
||||
|
||||
return {
|
||||
createdAtDate: row.get("createdAtDate"),
|
||||
|
@ -209,7 +130,7 @@ export function parseScyllaNote(row: types.Row): ScyllaNote {
|
|||
threadId: row.get("threadId") ?? null,
|
||||
channelId: row.get("channelId") ?? null,
|
||||
userId: row.get("userId"),
|
||||
userHost: row.get("userHost") ?? null,
|
||||
userHost: userHost !== "local" ? userHost : null,
|
||||
replyId: row.get("replyId") ?? null,
|
||||
replyUserId: row.get("replyUserId") ?? null,
|
||||
replyUserHost: row.get("replyUserHost") ?? null,
|
||||
|
@ -240,6 +161,8 @@ export interface ScyllaNoteReaction extends NoteReaction {
|
|||
|
||||
const QUERY_LIMIT = 1000; // TODO: should this be configurable?
|
||||
|
||||
export type TimelineKind = "home" | "local" | "recommended" | "global" | "renotes";
|
||||
|
||||
export function parseScyllaReaction(row: types.Row): ScyllaNoteReaction {
|
||||
return {
|
||||
id: row.get("id"),
|
||||
|
@ -251,18 +174,35 @@ export function parseScyllaReaction(row: types.Row): ScyllaNoteReaction {
|
|||
};
|
||||
}
|
||||
|
||||
export function prepareNoteQuery(ps: {
|
||||
export function prepareNoteQuery(
|
||||
kind: TimelineKind,
|
||||
ps: {
|
||||
untilId?: string;
|
||||
untilDate?: number;
|
||||
sinceId?: string;
|
||||
sinceDate?: number;
|
||||
noteId?: string;
|
||||
}): { query: string; untilDate: Date; sinceDate: Date | null } {
|
||||
const queryParts = [
|
||||
`${
|
||||
ps.noteId ? prepared.note.select.byRenoteId : prepared.note.select.byDate
|
||||
} AND "createdAt" < ?`,
|
||||
];
|
||||
const queryParts: string[] = [];
|
||||
|
||||
switch (kind) {
|
||||
case "home":
|
||||
queryParts.push(prepared.homeTimeline.select.byUserAndDate)
|
||||
break;
|
||||
case "local":
|
||||
queryParts.push(prepared.localTimeline.select.byDate);
|
||||
break;
|
||||
case "recommended":
|
||||
case "global":
|
||||
queryParts.push(prepared.globalTimeline.select.byDate);
|
||||
break;
|
||||
case "renotes":
|
||||
queryParts.push(prepared.note.select.byRenoteId);
|
||||
break;
|
||||
default:
|
||||
queryParts.push(prepared.note.select.byDate);
|
||||
}
|
||||
|
||||
queryParts.push(`AND "createdAt" < ?`);
|
||||
|
||||
let until = new Date();
|
||||
if (ps.untilId) {
|
||||
|
@ -294,6 +234,7 @@ export function prepareNoteQuery(ps: {
|
|||
}
|
||||
|
||||
export async function execNotePaginationQuery(
|
||||
kind: TimelineKind,
|
||||
ps: {
|
||||
limit: number;
|
||||
untilId?: string;
|
||||
|
@ -303,11 +244,19 @@ export async function execNotePaginationQuery(
|
|||
noteId?: string;
|
||||
},
|
||||
filter?: (_: ScyllaNote[]) => Promise<ScyllaNote[]>,
|
||||
userId: User["id"] | null = null,
|
||||
maxPartitions = config.scylla?.sparseTimelineDays ?? 14,
|
||||
): Promise<ScyllaNote[]> {
|
||||
if (!scyllaClient) return [];
|
||||
|
||||
let { query, untilDate, sinceDate } = prepareNoteQuery(ps);
|
||||
if (kind === "home" && !userId) {
|
||||
throw new Error("Query of home timeline needs userId");
|
||||
}
|
||||
if (kind === "renotes" && !ps.noteId) {
|
||||
throw new Error("Query of renotes needs noteId");
|
||||
}
|
||||
|
||||
let { query, untilDate, sinceDate } = prepareNoteQuery(kind, ps);
|
||||
|
||||
let scannedPartitions = 0;
|
||||
const foundNotes: ScyllaNote[] = [];
|
||||
|
@ -315,7 +264,11 @@ export async function execNotePaginationQuery(
|
|||
// Try to get posts of at most <maxPartitions> in the single request
|
||||
while (foundNotes.length < ps.limit && scannedPartitions < maxPartitions) {
|
||||
const params: (Date | string | string[] | number)[] = [];
|
||||
if (ps.noteId) {
|
||||
if (kind === "home" && userId) {
|
||||
params.push(userId);
|
||||
}
|
||||
|
||||
if (kind === "renotes" && ps.noteId) {
|
||||
params.push(ps.noteId, untilDate);
|
||||
} else {
|
||||
params.push(untilDate, untilDate);
|
||||
|
|
|
@ -5,7 +5,6 @@ import {
|
|||
Blockings,
|
||||
ChannelFollowings,
|
||||
Followings,
|
||||
MutedNotes,
|
||||
Mutings,
|
||||
RenoteMutings,
|
||||
UserProfiles,
|
||||
|
@ -289,6 +288,25 @@ export class LocalFollowingsCache extends SetCache {
|
|||
}
|
||||
}
|
||||
|
||||
export class LocalFollowersCache extends SetCache {
|
||||
private constructor(userId: string) {
|
||||
const fetcher = () =>
|
||||
Followings.find({
|
||||
select: ["followerId"],
|
||||
where: { followeeId: userId, followerHost: IsNull() },
|
||||
}).then((followers) => followers.map(({ followerId }) => followerId));
|
||||
|
||||
super("follower", userId, fetcher);
|
||||
}
|
||||
|
||||
public static async init(userId: string): Promise<LocalFollowersCache> {
|
||||
const cache = new LocalFollowersCache(userId);
|
||||
await cache.fetch();
|
||||
|
||||
return cache;
|
||||
}
|
||||
}
|
||||
|
||||
export class ChannelFollowingsCache extends SetCache {
|
||||
private constructor(userId: string) {
|
||||
const fetcher = () =>
|
||||
|
|
|
@ -140,7 +140,7 @@ export default define(meta, paramDef, async (ps, user) => {
|
|||
|
||||
const foundPacked = [];
|
||||
while (foundPacked.length < ps.limit) {
|
||||
const foundNotes = (await execNotePaginationQuery(ps, filter)).slice(
|
||||
const foundNotes = (await execNotePaginationQuery("global", ps, filter)).slice(
|
||||
0,
|
||||
ps.limit * 1.5,
|
||||
); // Some may filtered out by Notes.packMany, thus we take more than ps.limit.
|
||||
|
|
|
@ -167,7 +167,7 @@ export default define(meta, paramDef, async (ps, user) => {
|
|||
|
||||
const foundPacked = [];
|
||||
while (foundPacked.length < ps.limit) {
|
||||
const foundNotes = (await execNotePaginationQuery(ps, filter)).slice(
|
||||
const foundNotes = (await execNotePaginationQuery("global", ps, filter)).slice(
|
||||
0,
|
||||
ps.limit * 1.5,
|
||||
); // Some may filtered out by Notes.packMany, thus we take more than ps.limit.
|
||||
|
|
|
@ -146,10 +146,7 @@ export default define(meta, paramDef, async (ps, user) => {
|
|||
}
|
||||
|
||||
const filter = async (notes: ScyllaNote[]) => {
|
||||
let filtered = notes.filter(
|
||||
(n) => n.visibility === "public" && !n.userHost,
|
||||
);
|
||||
filtered = await filterChannel(filtered, user, followingChannelIds);
|
||||
let filtered = await filterChannel(notes, user, followingChannelIds);
|
||||
filtered = await filterReply(filtered, ps.withReplies, user);
|
||||
filtered = await filterVisibility(filtered, user, followingUserIds);
|
||||
if (user) {
|
||||
|
@ -182,7 +179,7 @@ export default define(meta, paramDef, async (ps, user) => {
|
|||
|
||||
const foundPacked = [];
|
||||
while (foundPacked.length < ps.limit) {
|
||||
const foundNotes = (await execNotePaginationQuery(ps, filter)).slice(
|
||||
const foundNotes = (await execNotePaginationQuery("local", ps, filter)).slice(
|
||||
0,
|
||||
ps.limit * 1.5,
|
||||
); // Some may filtered out by Notes.packMany, thus we take more than ps.limit.
|
||||
|
|
|
@ -177,7 +177,7 @@ export default define(meta, paramDef, async (ps, user) => {
|
|||
return filtered;
|
||||
};
|
||||
|
||||
const foundNotes = await execNotePaginationQuery(ps, filter);
|
||||
const foundNotes = await execNotePaginationQuery("recommended", ps, filter);
|
||||
return await Notes.packMany(foundNotes.slice(0, ps.limit), user, {
|
||||
scyllaNote: true,
|
||||
});
|
||||
|
|
|
@ -154,7 +154,7 @@ export default define(meta, paramDef, async (ps, user) => {
|
|||
|
||||
const foundPacked = [];
|
||||
while (foundPacked.length < ps.limit) {
|
||||
const foundNotes = (await execNotePaginationQuery(ps, filter)).slice(
|
||||
const foundNotes = (await execNotePaginationQuery("home", ps, filter, user.id)).slice(
|
||||
0,
|
||||
ps.limit * 1.5,
|
||||
); // Some may filtered out by Notes.packMany, thus we take more than ps.limit.
|
||||
|
|
|
@ -28,7 +28,7 @@ import type { Packed } from "@/misc/schema.js";
|
|||
import { getActiveWebhooks } from "@/misc/webhook-cache.js";
|
||||
import { webhookDeliver } from "@/queue/index.js";
|
||||
import { shouldSilenceInstance } from "@/misc/should-block-instance.js";
|
||||
import { LocalFollowingsCache } from "@/misc/cache.js";
|
||||
import { LocalFollowersCache, LocalFollowingsCache } from "@/misc/cache.js";
|
||||
|
||||
const logger = new Logger("following/create");
|
||||
|
||||
|
@ -83,9 +83,11 @@ export async function insertFollowingDoc(
|
|||
});
|
||||
|
||||
if (Users.isLocalUser(follower)) {
|
||||
// Cache following ID set
|
||||
const cache = await LocalFollowingsCache.init(follower.id);
|
||||
await cache.add(followee.id);
|
||||
// Cache relationship
|
||||
const followCache = await LocalFollowingsCache.init(follower.id);
|
||||
const followerCache = await LocalFollowersCache.init(followee.id);
|
||||
await followCache.add(followee.id);
|
||||
await followerCache.add(follower.id);
|
||||
}
|
||||
|
||||
const req = await FollowRequests.findOneBy({
|
||||
|
|
|
@ -13,7 +13,7 @@ import {
|
|||
perUserFollowingChart,
|
||||
} from "@/services/chart/index.js";
|
||||
import { getActiveWebhooks } from "@/misc/webhook-cache.js";
|
||||
import { LocalFollowingsCache } from "@/misc/cache.js";
|
||||
import { LocalFollowersCache, LocalFollowingsCache } from "@/misc/cache.js";
|
||||
|
||||
const logger = new Logger("following/delete");
|
||||
|
||||
|
@ -47,8 +47,10 @@ export default async function (
|
|||
await Followings.delete(following.id);
|
||||
|
||||
if (Users.isLocalUser(follower)) {
|
||||
const cache = await LocalFollowingsCache.init(follower.id);
|
||||
await cache.delete(followee.id);
|
||||
const followCache = await LocalFollowingsCache.init(follower.id);
|
||||
const followerCache = await LocalFollowersCache.init(followee.id);
|
||||
await followCache.delete(followee.id);
|
||||
await followerCache.delete(follower.id);
|
||||
}
|
||||
|
||||
decrementFollowing(follower, followee);
|
||||
|
|
|
@ -8,7 +8,7 @@ import { User } from "@/models/entities/user.js";
|
|||
import { Users, FollowRequests, Followings } from "@/models/index.js";
|
||||
import { decrementFollowing } from "./delete.js";
|
||||
import { getActiveWebhooks } from "@/misc/webhook-cache.js";
|
||||
import { LocalFollowingsCache } from "@/misc/cache.js";
|
||||
import { LocalFollowersCache, LocalFollowingsCache } from "@/misc/cache.js";
|
||||
|
||||
type Local =
|
||||
| ILocalUser
|
||||
|
@ -93,8 +93,10 @@ async function removeFollow(followee: Both, follower: Both) {
|
|||
|
||||
await Followings.delete(following.id);
|
||||
if (Users.isLocalUser(follower)) {
|
||||
const cache = await LocalFollowingsCache.init(follower.id);
|
||||
await cache.delete(followee.id);
|
||||
const followCache = await LocalFollowingsCache.init(follower.id);
|
||||
const followerCache = await LocalFollowersCache.init(followee.id);
|
||||
await followCache.delete(followee.id);
|
||||
await followerCache.delete(follower.id);
|
||||
}
|
||||
decrementFollowing(follower, followee);
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ import { normalizeForSearch } from "@/misc/normalize-for-search.js";
|
|||
import { getAntennas } from "@/misc/antenna-cache.js";
|
||||
import { endedPollNotificationQueue } from "@/queue/queues.js";
|
||||
import { webhookDeliver } from "@/queue/index.js";
|
||||
import { Cache } from "@/misc/cache.js";
|
||||
import { Cache, LocalFollowersCache } from "@/misc/cache.js";
|
||||
import type { UserProfile } from "@/models/entities/user-profile.js";
|
||||
import { db } from "@/db/postgre.js";
|
||||
import { getActiveWebhooks } from "@/misc/webhook-cache.js";
|
||||
|
@ -779,10 +779,23 @@ async function insertNote(
|
|||
width: file.properties.width ?? null,
|
||||
height: file.properties.height ?? null,
|
||||
});
|
||||
const replyText = data.reply?.text ?? null;
|
||||
const replyCw = data.reply?.cw ?? null;
|
||||
// TODO: move drive files to scylla or cache in redis/dragonfly
|
||||
const replyFiles = data.reply?.fileIds
|
||||
? await DriveFiles.findBy({ id: In(data.reply.fileIds) }).then(
|
||||
(files) => files.map(fileMapper),
|
||||
)
|
||||
: null;
|
||||
const renoteText = data.renote?.text ?? null;
|
||||
const renoteCw = data.renote?.text ?? null;
|
||||
const renoteFiles = data.renote?.fileIds
|
||||
? await DriveFiles.findBy({ id: In(data.renote.fileIds) }).then(
|
||||
(files) => files.map(fileMapper),
|
||||
)
|
||||
: null;
|
||||
|
||||
await scyllaClient.execute(
|
||||
prepared.note.insert,
|
||||
[
|
||||
const params = [
|
||||
insert.createdAt,
|
||||
insert.createdAt,
|
||||
insert.id,
|
||||
|
@ -806,33 +819,39 @@ async function insertNote(
|
|||
insert.threadId,
|
||||
insert.channelId,
|
||||
insert.userId,
|
||||
insert.userHost,
|
||||
insert.userHost ?? "local",
|
||||
insert.replyId,
|
||||
insert.replyUserId,
|
||||
insert.replyUserHost,
|
||||
data.reply?.text ?? null,
|
||||
data.reply?.cw ?? null,
|
||||
data.reply?.fileIds
|
||||
? await DriveFiles.findBy({ id: In(data.reply.fileIds) }).then(
|
||||
(files) => files.map(fileMapper),
|
||||
)
|
||||
: null,
|
||||
replyText,
|
||||
replyCw,
|
||||
replyFiles,
|
||||
insert.renoteId,
|
||||
insert.renoteUserId,
|
||||
insert.renoteUserHost,
|
||||
data.renote?.text ?? null,
|
||||
data.renote?.cw ?? null,
|
||||
data.renote?.fileIds
|
||||
? await DriveFiles.findBy({ id: In(data.renote.fileIds) }).then(
|
||||
(files) => files.map(fileMapper),
|
||||
)
|
||||
: null,
|
||||
renoteText,
|
||||
renoteCw,
|
||||
renoteFiles,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
],
|
||||
{ prepare: true },
|
||||
];
|
||||
|
||||
await scyllaClient.execute(prepared.note.insert, params, {
|
||||
prepare: true,
|
||||
});
|
||||
|
||||
// Insert to home timelines
|
||||
const localFollowers = await LocalFollowersCache.init(user.id).then(
|
||||
(cache) => cache.getAll(),
|
||||
);
|
||||
// Do not issue BATCH because different queries of inserting post to home timelines involve different partitions
|
||||
for (const follower of localFollowers) {
|
||||
// no need to wait
|
||||
scyllaClient.execute(prepared.homeTimeline.insert, [follower, ...params], {
|
||||
prepare: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
if (insert.hasPoll) {
|
||||
// Start transaction
|
||||
|
|
Loading…
Reference in a new issue