perf: query scylla for local timeline
This commit is contained in:
parent
9d30fd8f92
commit
df71ee65b2
4 changed files with 131 additions and 23 deletions
|
@ -352,11 +352,11 @@ export async function filterVisibility(
|
|||
["public", "home"].includes(note.visibility),
|
||||
);
|
||||
} else {
|
||||
let followings: User["id"][];
|
||||
let ids: User["id"][];
|
||||
if (followingIds) {
|
||||
followings = followingIds;
|
||||
ids = followingIds;
|
||||
} else {
|
||||
followings = await LocalFollowingsCache.init(user.id).then((cache) =>
|
||||
ids = await LocalFollowingsCache.init(user.id).then((cache) =>
|
||||
cache.getAll(),
|
||||
);
|
||||
}
|
||||
|
@ -368,7 +368,7 @@ export async function filterVisibility(
|
|||
note.visibleUserIds.includes(user.id) ||
|
||||
note.mentions.includes(user.id) ||
|
||||
(note.visibility === "followers" &&
|
||||
(followings.includes(note.userId) || note.replyUserId === user.id)),
|
||||
(ids.includes(note.userId) || note.replyUserId === user.id)),
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -142,6 +142,7 @@ export class Cache<T> {
|
|||
|
||||
class SetCache {
|
||||
private readonly key: string;
|
||||
private readonly fetchedKey: string;
|
||||
private readonly fetcher: () => Promise<string[]>;
|
||||
|
||||
protected constructor(
|
||||
|
@ -150,14 +151,16 @@ class SetCache {
|
|||
fetcher: () => Promise<string[]>,
|
||||
) {
|
||||
this.key = `setcache:${name}:${userId}`;
|
||||
this.fetchedKey = `${this.key}:fetched`;
|
||||
this.fetcher = fetcher;
|
||||
}
|
||||
|
||||
protected async fetch() {
|
||||
// Sync from DB if nothing is cached yet or cache is expired
|
||||
if (!(await this.exists())) {
|
||||
if ((await redisClient.exists(this.fetchedKey)) === 0) {
|
||||
await this.clear();
|
||||
await this.add(...(await this.fetcher()));
|
||||
await redisClient.set(this.fetchedKey, "", "EX", 60 * 30);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -167,7 +170,7 @@ class SetCache {
|
|||
await redisClient.sadd(this.key, targetIds);
|
||||
}
|
||||
if ((await redisClient.ttl(this.key)) < 0) {
|
||||
await redisClient.expire(this.key, 60 * 30); // Expires in 30 minutes
|
||||
await redisClient.expire(this.key, 60 * 60);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -197,6 +200,7 @@ class SetCache {
|
|||
|
||||
class HashCache {
|
||||
private readonly key: string;
|
||||
private readonly fetchedKey: string;
|
||||
private readonly fetcher: () => Promise<Map<string, string>>;
|
||||
|
||||
protected constructor(
|
||||
|
@ -205,14 +209,16 @@ class HashCache {
|
|||
fetcher: () => Promise<Map<string, string>>,
|
||||
) {
|
||||
this.key = `hashcache:${name}:${userId}`;
|
||||
this.fetchedKey = `${this.key}:fetched`;
|
||||
this.fetcher = fetcher;
|
||||
}
|
||||
|
||||
protected async fetch() {
|
||||
// Sync from DB if nothing is cached yet or cache is expired
|
||||
if (!(await this.exists())) {
|
||||
await redisClient.del(this.key);
|
||||
if ((await redisClient.exists(this.fetchedKey)) === 0) {
|
||||
await this.clear();
|
||||
await this.setHash(await this.fetcher());
|
||||
await redisClient.set(this.fetchedKey, "", "EX", 60 * 30);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -225,7 +231,7 @@ class HashCache {
|
|||
await redisClient.hset(this.key, hash);
|
||||
}
|
||||
if ((await redisClient.ttl(this.key)) < 0) {
|
||||
await redisClient.expire(this.key, 60 * 30); // Expires in 30 minutes
|
||||
await redisClient.expire(this.key, 60 * 60);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import { Brackets } from "typeorm";
|
||||
import { fetchMeta } from "@/misc/fetch-meta.js";
|
||||
import { Notes, Users } from "@/models/index.js";
|
||||
import { Notes, UserProfiles, Users } from "@/models/index.js";
|
||||
import { activeUsersChart } from "@/services/chart/index.js";
|
||||
import define from "../../define.js";
|
||||
import { ApiError } from "../../error.js";
|
||||
|
@ -12,6 +12,27 @@ import { generateMutedNoteQuery } from "../../common/generate-muted-note-query.j
|
|||
import { generateChannelQuery } from "../../common/generate-channel-query.js";
|
||||
import { generateBlockedUserQuery } from "../../common/generate-block-query.js";
|
||||
import { generateMutedUserRenotesQueryForNotes } from "../../common/generated-muted-renote-query.js";
|
||||
import {
|
||||
ScyllaNote,
|
||||
execTimelineQuery,
|
||||
filterBlockedUser,
|
||||
filterChannel,
|
||||
filterMutedNote,
|
||||
filterMutedRenotes,
|
||||
filterMutedUser,
|
||||
filterReply,
|
||||
filterVisibility,
|
||||
scyllaClient,
|
||||
} from "@/db/scylla.js";
|
||||
import {
|
||||
ChannelFollowingsCache,
|
||||
InstanceMutingsCache,
|
||||
LocalFollowingsCache,
|
||||
RenoteMutingsCache,
|
||||
UserBlockedCache,
|
||||
UserMutingsCache,
|
||||
userWordMuteCache,
|
||||
} from "@/misc/cache.js";
|
||||
|
||||
export const meta = {
|
||||
tags: ["notes"],
|
||||
|
@ -74,12 +95,97 @@ export const paramDef = {
|
|||
|
||||
export default define(meta, paramDef, async (ps, user) => {
|
||||
const m = await fetchMeta();
|
||||
|
||||
if (m.disableLocalTimeline) {
|
||||
if (user == null || !(user.isAdmin || user.isModerator)) {
|
||||
throw new ApiError(meta.errors.ltlDisabled);
|
||||
}
|
||||
}
|
||||
|
||||
process.nextTick(() => {
|
||||
if (user) {
|
||||
activeUsersChart.read(user);
|
||||
}
|
||||
});
|
||||
|
||||
if (scyllaClient) {
|
||||
let [
|
||||
followingChannelIds,
|
||||
followingUserIds,
|
||||
mutedUserIds,
|
||||
mutedInstances,
|
||||
blockerIds,
|
||||
renoteMutedIds,
|
||||
]: string[][] = [];
|
||||
let mutedWords: string[][];
|
||||
if (user) {
|
||||
[
|
||||
followingChannelIds,
|
||||
followingUserIds,
|
||||
mutedUserIds,
|
||||
mutedInstances,
|
||||
mutedWords,
|
||||
blockerIds,
|
||||
renoteMutedIds,
|
||||
] = await Promise.all([
|
||||
ChannelFollowingsCache.init(user.id).then((cache) => cache.getAll()),
|
||||
LocalFollowingsCache.init(user.id).then((cache) => cache.getAll()),
|
||||
UserMutingsCache.init(user.id).then((cache) => cache.getAll()),
|
||||
InstanceMutingsCache.init(user.id).then((cache) => cache.getAll()),
|
||||
userWordMuteCache
|
||||
.fetchMaybe(user.id, () =>
|
||||
UserProfiles.findOne({
|
||||
select: ["mutedWords"],
|
||||
where: { userId: user.id },
|
||||
}).then((profile) => profile?.mutedWords),
|
||||
)
|
||||
.then((words) => words ?? []),
|
||||
UserBlockedCache.init(user.id).then((cache) => cache.getAll()),
|
||||
RenoteMutingsCache.init(user.id).then((cache) => cache.getAll()),
|
||||
]);
|
||||
}
|
||||
|
||||
const filter = async (notes: ScyllaNote[]) => {
|
||||
let filtered = notes.filter(
|
||||
(n) => n.visibility === "public" && !n.userHost,
|
||||
);
|
||||
filtered = await filterChannel(filtered, user, followingChannelIds);
|
||||
filtered = await filterReply(filtered, ps.withReplies, user);
|
||||
filtered = await filterVisibility(filtered, user, followingUserIds);
|
||||
if (user) {
|
||||
filtered = await filterMutedUser(
|
||||
filtered,
|
||||
user,
|
||||
mutedUserIds,
|
||||
mutedInstances,
|
||||
);
|
||||
filtered = await filterMutedNote(filtered, user, mutedWords);
|
||||
filtered = await filterBlockedUser(filtered, user, blockerIds);
|
||||
filtered = await filterMutedRenotes(filtered, user, renoteMutedIds);
|
||||
}
|
||||
if (ps.withFiles) {
|
||||
filtered = filtered.filter((n) => n.files.length > 0);
|
||||
}
|
||||
if (ps.fileType) {
|
||||
filtered = filtered.filter((n) =>
|
||||
n.files.some((f) => ps.fileType?.includes(f.type)),
|
||||
);
|
||||
}
|
||||
if (ps.excludeNsfw) {
|
||||
filtered = filtered.filter(
|
||||
(n) => !n.cw && n.files.every((f) => !f.isSensitive),
|
||||
);
|
||||
}
|
||||
filtered = filtered.filter((n) => n.visibility !== "hidden");
|
||||
return filtered;
|
||||
};
|
||||
|
||||
const foundNotes = await execTimelineQuery(ps, filter);
|
||||
return await Notes.packMany(foundNotes.slice(0, ps.limit), user, {
|
||||
scyllaNote: true,
|
||||
});
|
||||
}
|
||||
|
||||
//#region Construct query
|
||||
const query = makePaginationQuery(
|
||||
Notes.createQueryBuilder("note"),
|
||||
|
@ -124,12 +230,6 @@ export default define(meta, paramDef, async (ps, user) => {
|
|||
query.andWhere("note.visibility != 'hidden'");
|
||||
//#endregion
|
||||
|
||||
process.nextTick(() => {
|
||||
if (user) {
|
||||
activeUsersChart.read(user);
|
||||
}
|
||||
});
|
||||
|
||||
// We fetch more than requested because some may be filtered out, and if there's less than
|
||||
// requested, the pagination stops.
|
||||
const found = [];
|
||||
|
|
|
@ -97,6 +97,7 @@ export default define(meta, paramDef, async (ps, user) => {
|
|||
followingUserIds,
|
||||
mutedUserIds,
|
||||
mutedInstances,
|
||||
mutedWords,
|
||||
blockerIds,
|
||||
renoteMutedIds,
|
||||
] = await Promise.all([
|
||||
|
@ -104,17 +105,18 @@ export default define(meta, paramDef, async (ps, user) => {
|
|||
followingsCache.getAll(),
|
||||
UserMutingsCache.init(user.id).then((cache) => cache.getAll()),
|
||||
InstanceMutingsCache.init(user.id).then((cache) => cache.getAll()),
|
||||
userWordMuteCache
|
||||
.fetchMaybe(user.id, () =>
|
||||
UserProfiles.findOne({
|
||||
select: ["mutedWords"],
|
||||
where: { userId: user.id },
|
||||
}).then((profile) => profile?.mutedWords),
|
||||
)
|
||||
.then((words) => words ?? []),
|
||||
UserBlockedCache.init(user.id).then((cache) => cache.getAll()),
|
||||
RenoteMutingsCache.init(user.id).then((cache) => cache.getAll()),
|
||||
]);
|
||||
const validUserIds = [user.id].concat(followingUserIds);
|
||||
const mutedWords =
|
||||
(await userWordMuteCache.fetchMaybe(user.id, () =>
|
||||
UserProfiles.findOne({
|
||||
select: ["mutedWords"],
|
||||
where: { userId: user.id },
|
||||
}).then((profile) => profile?.mutedWords),
|
||||
)) ?? [];
|
||||
const optFilter = (n: ScyllaNote) =>
|
||||
!n.renoteId || !!n.text || n.files.length > 0 || n.hasPoll;
|
||||
|
||||
|
|
Loading…
Reference in a new issue