perf: mutings cache

This commit is contained in:
Namekuji 2023-08-05 15:52:13 -04:00
parent dabb19ef7c
commit 83c00ee340
No known key found for this signature in database
GPG key ID: 1D62332C07FBA532
16 changed files with 251 additions and 48 deletions

View file

@ -5,7 +5,12 @@ import type { Note } from "@/models/entities/note.js";
import type { NoteReaction } from "@/models/entities/note-reaction.js";
import { Client, types, tracker } from "cassandra-driver";
import type { User } from "@/models/entities/user.js";
import { ChannelFollowingsCache, LocalFollowingsCache } from "@/misc/cache.js";
import {
ChannelFollowingsCache,
InstanceMutingsCache,
LocalFollowingsCache,
UserMutingsCache,
} from "@/misc/cache.js";
import { getTimestamp } from "@/misc/gen-id.js";
import Logger from "@/services/logger.js";
@ -267,13 +272,12 @@ export async function execTimelineQuery(
let { query, untilDate, sinceDate } = prepareTimelineQuery(ps);
let scannedPartitions = 0;
let scannedEmptyPartitions = 0;
const foundNotes: ScyllaNote[] = [];
// Try to get posts of at most <maxDays> in the single request
while (foundNotes.length < ps.limit && scannedPartitions < maxDays) {
while (foundNotes.length < ps.limit && scannedEmptyPartitions < maxDays) {
const params: (Date | string | string[])[] = [untilDate, untilDate];
if (sinceDate) {
params.push(sinceDate);
}
@ -284,7 +288,7 @@ export async function execTimelineQuery(
if (result.rowLength === 0) {
// Reached the end of partition. Queries posts created one day before.
scannedPartitions++;
scannedEmptyPartitions++;
untilDate = new Date(
untilDate.getFullYear(),
untilDate.getMonth(),
@ -299,6 +303,8 @@ export async function execTimelineQuery(
continue;
}
scannedEmptyPartitions = 0;
const notes = result.rows.map(parseScyllaNote);
foundNotes.push(...(filter ? await filter(notes) : notes));
untilDate = notes[notes.length - 1].createdAt;
@ -392,3 +398,29 @@ export async function filterReply(
return filtered;
}
export async function filterMutedUser(
notes: ScyllaNote[],
user: { id: User["id"] },
exclude?: User,
) {
const userCache = await UserMutingsCache.init(user.id);
let mutedUserIds = await userCache.getAll();
if (exclude) {
mutedUserIds = mutedUserIds.filter((id) => id !== exclude.id);
}
const instanceCache = await InstanceMutingsCache.init(user.id);
const mutedInstances = await instanceCache.getAll();
return notes.filter(
(note) =>
!mutedUserIds.includes(note.userId) &&
!(note.replyUserId && mutedUserIds.includes(note.replyUserId)) &&
!(note.renoteUserId && mutedUserIds.includes(note.renoteUserId)) &&
!(note.userHost && mutedInstances.includes(note.userHost)) &&
!(note.replyUserHost && mutedInstances.includes(note.replyUserHost)) &&
!(note.renoteUserHost && mutedInstances.includes(note.renoteUserHost)),
);
}

View file

@ -1,7 +1,12 @@
import { redisClient } from "@/db/redis.js";
import { encode, decode } from "msgpackr";
import { ChainableCommander } from "ioredis";
import { ChannelFollowings, Followings } from "@/models/index.js";
import {
ChannelFollowings,
Followings,
Mutings,
UserProfiles,
} from "@/models/index.js";
import { IsNull } from "typeorm";
export class Cache<T> {
@ -133,8 +138,8 @@ export class Cache<T> {
}
class SetCache {
private key: string;
private fetcher: () => Promise<string[]>;
private readonly key: string;
private readonly fetcher: () => Promise<string[]>;
protected constructor(
name: string,
@ -147,36 +152,38 @@ class SetCache {
protected async fetch() {
// Sync from DB if nothing is cached yet or cache is expired
const ttlKey = `${this.key}:fetched`;
if (
!(await this.hasFollowing()) ||
(await redisClient.exists(ttlKey)) === 0
) {
await redisClient.del(this.key);
await this.follow(...(await this.fetcher()));
await redisClient.set(ttlKey, "yes", "EX", 60 * 30); // Expires in 30 minutes
if (!(await this.exists())) {
await this.clear();
await this.add(...(await this.fetcher()));
}
}
public async follow(...targetIds: string[]) {
public async add(...targetIds: string[]) {
if (targetIds.length > 0) {
// This is no-op if targets are already in cache
await redisClient.sadd(this.key, targetIds);
}
if ((await redisClient.ttl(this.key)) < 0) {
await redisClient.expire(this.key, 60 * 30); // Expires in 30 minutes
}
}
public async unfollow(...targetIds: string[]) {
public async delete(...targetIds: string[]) {
if (targetIds.length > 0) {
// This is no-op if targets are not in cache
await redisClient.srem(this.key, targetIds);
}
}
public async isFollowing(targetId: string): Promise<boolean> {
public async clear() {
await redisClient.del(this.key);
}
public async has(targetId: string): Promise<boolean> {
return (await redisClient.sismember(this.key, targetId)) === 1;
}
public async hasFollowing(): Promise<boolean> {
public async exists(): Promise<boolean> {
return (await redisClient.scard(this.key)) !== 0;
}
@ -185,13 +192,74 @@ class SetCache {
}
}
class HashCache {
private readonly key: string;
private readonly fetcher: () => Promise<Map<string, string>>;
protected constructor(
name: string,
userId: string,
fetcher: () => Promise<Map<string, string>>,
) {
this.key = `hashcache:${name}:${userId}`;
this.fetcher = fetcher;
}
protected async fetch() {
// Sync from DB if nothing is cached yet or cache is expired
if (!(await this.exists())) {
await redisClient.del(this.key);
await this.setHash(await this.fetcher());
}
}
public async exists(): Promise<boolean> {
return (await redisClient.hlen(this.key)) > 0;
}
public async setHash(hash: Map<string, string>) {
if (hash.size > 0) {
await redisClient.hset(this.key, hash);
}
if ((await redisClient.ttl(this.key)) < 0) {
await redisClient.expire(this.key, 60 * 30); // Expires in 30 minutes
}
}
public async set(field: string, value: string) {
await this.setHash(new Map([[field, value]]));
}
public async delete(...fields: string[]) {
await redisClient.hdel(this.key, ...fields);
}
public async clear() {
await redisClient.del(this.key);
}
public async get(...fields: string[]): Promise<Map<string, string>> {
let pairs: [string, string][] = [];
if (fields.length > 0) {
pairs = (await redisClient.hmget(this.key, ...fields))
.map((v, i) => [fields[i], v] as [string, string | null])
.filter(([_, value]) => value !== null) as [string, string][];
} else {
pairs = Object.entries(await redisClient.hgetall(this.key));
}
return new Map(pairs);
}
}
export class LocalFollowingsCache extends SetCache {
private constructor(userId: string) {
const fetcher = () =>
Followings.find({
select: { followeeId: true },
where: { followerId: userId, followerHost: IsNull() },
}).then((follows) => follows.map((follow) => follow.followeeId));
}).then((follows) => follows.map(({ followeeId }) => followeeId));
super("follow", userId, fetcher);
}
@ -212,7 +280,7 @@ export class ChannelFollowingsCache extends SetCache {
where: {
followerId: userId,
},
}).then((follows) => follows.map((follow) => follow.followeeId));
}).then((follows) => follows.map(({ followeeId }) => followeeId));
super("channel", userId, fetcher);
}
@ -224,3 +292,89 @@ export class ChannelFollowingsCache extends SetCache {
return cache;
}
}
export class UserMutingsCache extends HashCache {
private constructor(userId: string) {
const fetcher = () =>
Mutings.find({
select: { muteeId: true, expiresAt: true },
where: { muterId: userId },
}).then(
(mutes) =>
new Map(
mutes.map(({ muteeId, expiresAt }) => [
muteeId,
expiresAt?.toISOString() ?? "",
]),
),
);
super("mute", userId, fetcher);
}
public static async init(userId: string): Promise<UserMutingsCache> {
const cache = new UserMutingsCache(userId);
await cache.fetch();
return cache;
}
public async mute(muteeId: string, expiresAt?: Date | null) {
await this.set(muteeId, expiresAt?.toISOString() ?? "");
}
public async unmute(muteeId: string) {
await this.delete(muteeId);
}
public async getAll(): Promise<string[]> {
const mutes = await this.get();
const expired: string[] = [];
const valid: string[] = [];
for (const [k, v] of mutes.entries()) {
if (v !== "" && new Date(v) < new Date()) {
expired.push(k);
} else {
valid.push(k);
}
}
await this.delete(...expired);
return valid;
}
public async isMuting(muteeId: string): Promise<boolean> {
const result = (await this.get(muteeId)).get(muteeId); // Could be undefined or ""
let muting = result === "";
if (result) {
muting = new Date(result) > new Date(); // Check if not expired yet
if (!muting) {
await this.unmute(muteeId);
}
}
return muting;
}
}
export class InstanceMutingsCache extends SetCache {
private constructor(userId: string) {
const fetcher = () =>
UserProfiles.findOne({
select: { mutedInstances: true },
where: { userId },
}).then((profile) => (profile ? profile.mutedInstances : []));
super("instanceMute", userId, fetcher);
}
public static async init(userId: string): Promise<InstanceMutingsCache> {
const cache = new InstanceMutingsCache(userId);
await cache.fetch();
return cache;
}
}

View file

@ -139,7 +139,7 @@ export const NoteRepository = db.getRepository(Note).extend({
if (Users.isLocalUser(user)) {
const cache = await LocalFollowingsCache.init(meId);
return await cache.isFollowing(note.userId);
return await cache.has(note.userId);
}
const following = await Followings.exist({

View file

@ -10,6 +10,7 @@ import type { DbUserImportJobData } from "@/queue/types.js";
import type { User } from "@/models/entities/user.js";
import { genId } from "@/misc/gen-id.js";
import { IsNull } from "typeorm";
import { UserMutingsCache } from "@/misc/cache.js";
const logger = queueLogger.createSubLogger("import-muting");
@ -86,4 +87,6 @@ async function mute(user: User, target: User) {
muterId: user.id,
muteeId: target.id,
});
const cache = await UserMutingsCache.init(user.id);
await cache.mute(target.id);
}

View file

@ -9,6 +9,7 @@ import {
prepared,
scyllaClient,
} from "@/db/scylla.js";
import { userByIdCache } from "@/services/user-cache.js";
/**
* Get note for API processing, taking into account visibility.
@ -59,9 +60,11 @@ export async function getNote(
* Get user for API processing
*/
export async function getUser(userId: User["id"]) {
const user = await Users.findOneBy({ id: userId });
const user = await userByIdCache.fetchMaybe(userId, () =>
Users.findOneBy({ id: userId }).then((u) => u ?? undefined),
);
if (user == null) {
if (!user) {
throw new IdentifiableError(
"15348ddd-432d-49c2-8a5a-8069753becff",
"No such user.",

View file

@ -48,7 +48,7 @@ export default define(meta, paramDef, async (ps, user) => {
if (scyllaClient) {
const cache = await ChannelFollowingsCache.init(user.id);
await cache.follow(channel.id);
await cache.add(channel.id);
}
publishUserEvent(user.id, "followChannel", channel);

View file

@ -45,7 +45,7 @@ export default define(meta, paramDef, async (ps, user) => {
if (scyllaClient) {
const cache = await ChannelFollowingsCache.init(user.id);
await cache.unfollow(channel.id);
await cache.delete(channel.id);
}
publishUserEvent(user.id, "unfollowChannel", channel);

View file

@ -84,7 +84,7 @@ export default define(meta, paramDef, async (ps, user) => {
// Check if already following
const cache = await LocalFollowingsCache.init(follower.id);
const exist = await cache.isFollowing(followee.id);
const exist = await cache.has(followee.id);
if (exist) {
throw new ApiError(meta.errors.alreadyFollowing);

View file

@ -71,7 +71,7 @@ export default define(meta, paramDef, async (ps, user) => {
// Check not following
const cache = await LocalFollowingsCache.init(follower.id);
const exist = await cache.isFollowing(followee.id);
const exist = await cache.has(followee.id);
if (!exist) {
throw new ApiError(meta.errors.notFollowing);

View file

@ -14,9 +14,9 @@ import { normalizeForSearch } from "@/misc/normalize-for-search.js";
import { langmap } from "@/misc/langmap.js";
import { verifyLink } from "@/services/fetch-rel-me.js";
import { ApiError } from "../../error.js";
import config from "@/config/index.js";
import define from "../../define.js";
import { userByIdCache, userDenormalizedCache } from "@/services/user-cache.js";
import { InstanceMutingsCache } from "@/misc/cache.js";
export const meta = {
tags: ["account"],
@ -323,9 +323,17 @@ export default define(meta, paramDef, async (ps, _user, token) => {
}
await userDenormalizedCache.set(data.id, data);
}
if (Object.keys(profileUpdates).length > 0)
if (Object.keys(profileUpdates).length > 0) {
await UserProfiles.update(user.id, profileUpdates);
if (profileUpdates.mutedInstances) {
const cache = await InstanceMutingsCache.init(user.id);
await cache.clear();
await cache.add(...profileUpdates.mutedInstances);
}
}
const iObj = await Users.pack<true, true>(user.id, user, {
detail: true,
includeSecrets: isSecure,

View file

@ -5,6 +5,7 @@ import { genId } from "@/misc/gen-id.js";
import { Mutings, NoteWatchings } from "@/models/index.js";
import type { Muting } from "@/models/entities/muting.js";
import { publishUserEvent } from "@/services/stream.js";
import { UserMutingsCache } from "@/misc/cache.js";
export const meta = {
tags: ["account"],
@ -64,12 +65,8 @@ export default define(meta, paramDef, async (ps, user) => {
});
// Check if already muting
const exist = await Mutings.exist({
where: {
muterId: muter.id,
muteeId: mutee.id,
},
});
const cache = await UserMutingsCache.init(muter.id);
const exist = await cache.isMuting(mutee.id);
if (exist) {
throw new ApiError(meta.errors.alreadyMuting);
@ -79,14 +76,17 @@ export default define(meta, paramDef, async (ps, user) => {
return;
}
const expiresAt = ps.expiresAt ? new Date(ps.expiresAt) : null;
// Create mute
await Mutings.insert({
id: genId(),
createdAt: new Date(),
expiresAt: ps.expiresAt ? new Date(ps.expiresAt) : null,
expiresAt,
muterId: muter.id,
muteeId: mutee.id,
} as Muting);
await cache.mute(mutee.id, expiresAt);
publishUserEvent(user.id, "mute", mutee);

View file

@ -3,6 +3,7 @@ import { ApiError } from "../../error.js";
import { getUser } from "../../common/getters.js";
import { Mutings } from "@/models/index.js";
import { publishUserEvent } from "@/services/stream.js";
import { UserMutingsCache } from "@/misc/cache.js";
export const meta = {
tags: ["account"],
@ -56,19 +57,19 @@ export default define(meta, paramDef, async (ps, user) => {
});
// Check not muting
const muting = await Mutings.findOneBy({
muterId: muter.id,
muteeId: mutee.id,
});
const cache = await UserMutingsCache.init(muter.id);
const muting = await cache.isMuting(mutee.id);
if (muting == null) {
if (!muting) {
throw new ApiError(meta.errors.notMuting);
}
// Delete mute
await Mutings.delete({
id: muting.id,
muterId: muter.id,
muteeId: mutee.id,
});
await cache.unmute(mutee.id);
publishUserEvent(user.id, "unmute", mutee);
});

View file

@ -18,6 +18,7 @@ import {
filterReply,
filterVisibility,
execTimelineQuery,
filterMutedUser,
} from "@/db/scylla.js";
import { ChannelFollowingsCache, LocalFollowingsCache } from "@/misc/cache.js";
@ -86,6 +87,7 @@ export default define(meta, paramDef, async (ps, user) => {
filtered = await filterChannel(filtered, user, followingChannelIds);
filtered = await filterReply(filtered, ps.withReplies, user);
filtered = await filterVisibility(filtered, user, followingUserIds);
filtered = await filterMutedUser(filtered, user);
return filtered;
};
@ -95,7 +97,7 @@ export default define(meta, paramDef, async (ps, user) => {
});
}
const hasFollowing = await followingsCache.hasFollowing();
const hasFollowing = await followingsCache.exists();
//#region Construct query
const followingQuery = Followings.createQueryBuilder("following")

View file

@ -85,7 +85,7 @@ export async function insertFollowingDoc(
if (Users.isLocalUser(follower)) {
// Cache following ID set
const cache = await LocalFollowingsCache.init(follower.id);
await cache.follow(followee.id);
await cache.add(followee.id);
}
const req = await FollowRequests.findOneBy({

View file

@ -48,7 +48,7 @@ export default async function (
if (Users.isLocalUser(follower)) {
const cache = await LocalFollowingsCache.init(follower.id);
await cache.unfollow(followee.id);
await cache.delete(followee.id);
}
decrementFollowing(follower, followee);

View file

@ -94,7 +94,7 @@ async function removeFollow(followee: Both, follower: Both) {
await Followings.delete(following.id);
if (Users.isLocalUser(follower)) {
const cache = await LocalFollowingsCache.init(follower.id);
await cache.unfollow(followee.id);
await cache.delete(followee.id);
}
decrementFollowing(follower, followee);
}