diff --git a/packages/backend/src/db/scylla.ts b/packages/backend/src/db/scylla.ts index 46835e5eb7..4de7d10ff7 100644 --- a/packages/backend/src/db/scylla.ts +++ b/packages/backend/src/db/scylla.ts @@ -5,7 +5,12 @@ import type { Note } from "@/models/entities/note.js"; import type { NoteReaction } from "@/models/entities/note-reaction.js"; import { Client, types, tracker } from "cassandra-driver"; import type { User } from "@/models/entities/user.js"; -import { ChannelFollowingsCache, LocalFollowingsCache } from "@/misc/cache.js"; +import { + ChannelFollowingsCache, + InstanceMutingsCache, + LocalFollowingsCache, + UserMutingsCache, +} from "@/misc/cache.js"; import { getTimestamp } from "@/misc/gen-id.js"; import Logger from "@/services/logger.js"; @@ -267,13 +272,12 @@ export async function execTimelineQuery( let { query, untilDate, sinceDate } = prepareTimelineQuery(ps); - let scannedPartitions = 0; + let scannedEmptyPartitions = 0; const foundNotes: ScyllaNote[] = []; // Try to get posts of at most in the single request - while (foundNotes.length < ps.limit && scannedPartitions < maxDays) { + while (foundNotes.length < ps.limit && scannedEmptyPartitions < maxDays) { const params: (Date | string | string[])[] = [untilDate, untilDate]; - if (sinceDate) { params.push(sinceDate); } @@ -284,7 +288,7 @@ export async function execTimelineQuery( if (result.rowLength === 0) { // Reached the end of partition. Queries posts created one day before. - scannedPartitions++; + scannedEmptyPartitions++; untilDate = new Date( untilDate.getFullYear(), untilDate.getMonth(), @@ -299,6 +303,8 @@ export async function execTimelineQuery( continue; } + scannedEmptyPartitions = 0; + const notes = result.rows.map(parseScyllaNote); foundNotes.push(...(filter ? await filter(notes) : notes)); untilDate = notes[notes.length - 1].createdAt; @@ -392,3 +398,29 @@ export async function filterReply( return filtered; } + +export async function filterMutedUser( + notes: ScyllaNote[], + user: { id: User["id"] }, + exclude?: User, +) { + const userCache = await UserMutingsCache.init(user.id); + let mutedUserIds = await userCache.getAll(); + + if (exclude) { + mutedUserIds = mutedUserIds.filter((id) => id !== exclude.id); + } + + const instanceCache = await InstanceMutingsCache.init(user.id); + const mutedInstances = await instanceCache.getAll(); + + return notes.filter( + (note) => + !mutedUserIds.includes(note.userId) && + !(note.replyUserId && mutedUserIds.includes(note.replyUserId)) && + !(note.renoteUserId && mutedUserIds.includes(note.renoteUserId)) && + !(note.userHost && mutedInstances.includes(note.userHost)) && + !(note.replyUserHost && mutedInstances.includes(note.replyUserHost)) && + !(note.renoteUserHost && mutedInstances.includes(note.renoteUserHost)), + ); +} diff --git a/packages/backend/src/misc/cache.ts b/packages/backend/src/misc/cache.ts index 26301939ef..8b9aa37c19 100644 --- a/packages/backend/src/misc/cache.ts +++ b/packages/backend/src/misc/cache.ts @@ -1,7 +1,12 @@ import { redisClient } from "@/db/redis.js"; import { encode, decode } from "msgpackr"; import { ChainableCommander } from "ioredis"; -import { ChannelFollowings, Followings } from "@/models/index.js"; +import { + ChannelFollowings, + Followings, + Mutings, + UserProfiles, +} from "@/models/index.js"; import { IsNull } from "typeorm"; export class Cache { @@ -133,8 +138,8 @@ export class Cache { } class SetCache { - private key: string; - private fetcher: () => Promise; + private readonly key: string; + private readonly fetcher: () => Promise; protected constructor( name: string, @@ -147,36 +152,38 @@ class SetCache { protected async fetch() { // Sync from DB if nothing is cached yet or cache is expired - const ttlKey = `${this.key}:fetched`; - if ( - !(await this.hasFollowing()) || - (await redisClient.exists(ttlKey)) === 0 - ) { - await redisClient.del(this.key); - await this.follow(...(await this.fetcher())); - await redisClient.set(ttlKey, "yes", "EX", 60 * 30); // Expires in 30 minutes + if (!(await this.exists())) { + await this.clear(); + await this.add(...(await this.fetcher())); } } - public async follow(...targetIds: string[]) { + public async add(...targetIds: string[]) { if (targetIds.length > 0) { // This is no-op if targets are already in cache await redisClient.sadd(this.key, targetIds); } + if ((await redisClient.ttl(this.key)) < 0) { + await redisClient.expire(this.key, 60 * 30); // Expires in 30 minutes + } } - public async unfollow(...targetIds: string[]) { + public async delete(...targetIds: string[]) { if (targetIds.length > 0) { // This is no-op if targets are not in cache await redisClient.srem(this.key, targetIds); } } - public async isFollowing(targetId: string): Promise { + public async clear() { + await redisClient.del(this.key); + } + + public async has(targetId: string): Promise { return (await redisClient.sismember(this.key, targetId)) === 1; } - public async hasFollowing(): Promise { + public async exists(): Promise { return (await redisClient.scard(this.key)) !== 0; } @@ -185,13 +192,74 @@ class SetCache { } } +class HashCache { + private readonly key: string; + private readonly fetcher: () => Promise>; + + protected constructor( + name: string, + userId: string, + fetcher: () => Promise>, + ) { + this.key = `hashcache:${name}:${userId}`; + this.fetcher = fetcher; + } + + protected async fetch() { + // Sync from DB if nothing is cached yet or cache is expired + if (!(await this.exists())) { + await redisClient.del(this.key); + await this.setHash(await this.fetcher()); + } + } + + public async exists(): Promise { + return (await redisClient.hlen(this.key)) > 0; + } + + public async setHash(hash: Map) { + if (hash.size > 0) { + await redisClient.hset(this.key, hash); + } + if ((await redisClient.ttl(this.key)) < 0) { + await redisClient.expire(this.key, 60 * 30); // Expires in 30 minutes + } + } + + public async set(field: string, value: string) { + await this.setHash(new Map([[field, value]])); + } + + public async delete(...fields: string[]) { + await redisClient.hdel(this.key, ...fields); + } + + public async clear() { + await redisClient.del(this.key); + } + + public async get(...fields: string[]): Promise> { + let pairs: [string, string][] = []; + + if (fields.length > 0) { + pairs = (await redisClient.hmget(this.key, ...fields)) + .map((v, i) => [fields[i], v] as [string, string | null]) + .filter(([_, value]) => value !== null) as [string, string][]; + } else { + pairs = Object.entries(await redisClient.hgetall(this.key)); + } + + return new Map(pairs); + } +} + export class LocalFollowingsCache extends SetCache { private constructor(userId: string) { const fetcher = () => Followings.find({ select: { followeeId: true }, where: { followerId: userId, followerHost: IsNull() }, - }).then((follows) => follows.map((follow) => follow.followeeId)); + }).then((follows) => follows.map(({ followeeId }) => followeeId)); super("follow", userId, fetcher); } @@ -212,7 +280,7 @@ export class ChannelFollowingsCache extends SetCache { where: { followerId: userId, }, - }).then((follows) => follows.map((follow) => follow.followeeId)); + }).then((follows) => follows.map(({ followeeId }) => followeeId)); super("channel", userId, fetcher); } @@ -224,3 +292,89 @@ export class ChannelFollowingsCache extends SetCache { return cache; } } + +export class UserMutingsCache extends HashCache { + private constructor(userId: string) { + const fetcher = () => + Mutings.find({ + select: { muteeId: true, expiresAt: true }, + where: { muterId: userId }, + }).then( + (mutes) => + new Map( + mutes.map(({ muteeId, expiresAt }) => [ + muteeId, + expiresAt?.toISOString() ?? "", + ]), + ), + ); + + super("mute", userId, fetcher); + } + + public static async init(userId: string): Promise { + const cache = new UserMutingsCache(userId); + await cache.fetch(); + + return cache; + } + + public async mute(muteeId: string, expiresAt?: Date | null) { + await this.set(muteeId, expiresAt?.toISOString() ?? ""); + } + + public async unmute(muteeId: string) { + await this.delete(muteeId); + } + + public async getAll(): Promise { + const mutes = await this.get(); + const expired: string[] = []; + const valid: string[] = []; + + for (const [k, v] of mutes.entries()) { + if (v !== "" && new Date(v) < new Date()) { + expired.push(k); + } else { + valid.push(k); + } + } + + await this.delete(...expired); + + return valid; + } + + public async isMuting(muteeId: string): Promise { + const result = (await this.get(muteeId)).get(muteeId); // Could be undefined or "" + let muting = result === ""; + + if (result) { + muting = new Date(result) > new Date(); // Check if not expired yet + if (!muting) { + await this.unmute(muteeId); + } + } + + return muting; + } +} + +export class InstanceMutingsCache extends SetCache { + private constructor(userId: string) { + const fetcher = () => + UserProfiles.findOne({ + select: { mutedInstances: true }, + where: { userId }, + }).then((profile) => (profile ? profile.mutedInstances : [])); + + super("instanceMute", userId, fetcher); + } + + public static async init(userId: string): Promise { + const cache = new InstanceMutingsCache(userId); + await cache.fetch(); + + return cache; + } +} diff --git a/packages/backend/src/models/repositories/note.ts b/packages/backend/src/models/repositories/note.ts index a1923d0168..ba7b9937ac 100644 --- a/packages/backend/src/models/repositories/note.ts +++ b/packages/backend/src/models/repositories/note.ts @@ -139,7 +139,7 @@ export const NoteRepository = db.getRepository(Note).extend({ if (Users.isLocalUser(user)) { const cache = await LocalFollowingsCache.init(meId); - return await cache.isFollowing(note.userId); + return await cache.has(note.userId); } const following = await Followings.exist({ diff --git a/packages/backend/src/queue/processors/db/import-muting.ts b/packages/backend/src/queue/processors/db/import-muting.ts index 80e0567397..0fb2298097 100644 --- a/packages/backend/src/queue/processors/db/import-muting.ts +++ b/packages/backend/src/queue/processors/db/import-muting.ts @@ -10,6 +10,7 @@ import type { DbUserImportJobData } from "@/queue/types.js"; import type { User } from "@/models/entities/user.js"; import { genId } from "@/misc/gen-id.js"; import { IsNull } from "typeorm"; +import { UserMutingsCache } from "@/misc/cache.js"; const logger = queueLogger.createSubLogger("import-muting"); @@ -86,4 +87,6 @@ async function mute(user: User, target: User) { muterId: user.id, muteeId: target.id, }); + const cache = await UserMutingsCache.init(user.id); + await cache.mute(target.id); } diff --git a/packages/backend/src/server/api/common/getters.ts b/packages/backend/src/server/api/common/getters.ts index d15d94b03c..09595c2b27 100644 --- a/packages/backend/src/server/api/common/getters.ts +++ b/packages/backend/src/server/api/common/getters.ts @@ -9,6 +9,7 @@ import { prepared, scyllaClient, } from "@/db/scylla.js"; +import { userByIdCache } from "@/services/user-cache.js"; /** * Get note for API processing, taking into account visibility. @@ -59,9 +60,11 @@ export async function getNote( * Get user for API processing */ export async function getUser(userId: User["id"]) { - const user = await Users.findOneBy({ id: userId }); + const user = await userByIdCache.fetchMaybe(userId, () => + Users.findOneBy({ id: userId }).then((u) => u ?? undefined), + ); - if (user == null) { + if (!user) { throw new IdentifiableError( "15348ddd-432d-49c2-8a5a-8069753becff", "No such user.", diff --git a/packages/backend/src/server/api/endpoints/channels/follow.ts b/packages/backend/src/server/api/endpoints/channels/follow.ts index 1d8e2eb427..b2e1907fa0 100644 --- a/packages/backend/src/server/api/endpoints/channels/follow.ts +++ b/packages/backend/src/server/api/endpoints/channels/follow.ts @@ -48,7 +48,7 @@ export default define(meta, paramDef, async (ps, user) => { if (scyllaClient) { const cache = await ChannelFollowingsCache.init(user.id); - await cache.follow(channel.id); + await cache.add(channel.id); } publishUserEvent(user.id, "followChannel", channel); diff --git a/packages/backend/src/server/api/endpoints/channels/unfollow.ts b/packages/backend/src/server/api/endpoints/channels/unfollow.ts index e5f5d518c5..6a6b7870ba 100644 --- a/packages/backend/src/server/api/endpoints/channels/unfollow.ts +++ b/packages/backend/src/server/api/endpoints/channels/unfollow.ts @@ -45,7 +45,7 @@ export default define(meta, paramDef, async (ps, user) => { if (scyllaClient) { const cache = await ChannelFollowingsCache.init(user.id); - await cache.unfollow(channel.id); + await cache.delete(channel.id); } publishUserEvent(user.id, "unfollowChannel", channel); diff --git a/packages/backend/src/server/api/endpoints/following/create.ts b/packages/backend/src/server/api/endpoints/following/create.ts index f82024b621..c716372bf5 100644 --- a/packages/backend/src/server/api/endpoints/following/create.ts +++ b/packages/backend/src/server/api/endpoints/following/create.ts @@ -84,7 +84,7 @@ export default define(meta, paramDef, async (ps, user) => { // Check if already following const cache = await LocalFollowingsCache.init(follower.id); - const exist = await cache.isFollowing(followee.id); + const exist = await cache.has(followee.id); if (exist) { throw new ApiError(meta.errors.alreadyFollowing); diff --git a/packages/backend/src/server/api/endpoints/following/delete.ts b/packages/backend/src/server/api/endpoints/following/delete.ts index 0cb6c58863..f17b471f35 100644 --- a/packages/backend/src/server/api/endpoints/following/delete.ts +++ b/packages/backend/src/server/api/endpoints/following/delete.ts @@ -71,7 +71,7 @@ export default define(meta, paramDef, async (ps, user) => { // Check not following const cache = await LocalFollowingsCache.init(follower.id); - const exist = await cache.isFollowing(followee.id); + const exist = await cache.has(followee.id); if (!exist) { throw new ApiError(meta.errors.notFollowing); diff --git a/packages/backend/src/server/api/endpoints/i/update.ts b/packages/backend/src/server/api/endpoints/i/update.ts index d6f1f2d783..a53be2d008 100644 --- a/packages/backend/src/server/api/endpoints/i/update.ts +++ b/packages/backend/src/server/api/endpoints/i/update.ts @@ -14,9 +14,9 @@ import { normalizeForSearch } from "@/misc/normalize-for-search.js"; import { langmap } from "@/misc/langmap.js"; import { verifyLink } from "@/services/fetch-rel-me.js"; import { ApiError } from "../../error.js"; -import config from "@/config/index.js"; import define from "../../define.js"; import { userByIdCache, userDenormalizedCache } from "@/services/user-cache.js"; +import { InstanceMutingsCache } from "@/misc/cache.js"; export const meta = { tags: ["account"], @@ -323,9 +323,17 @@ export default define(meta, paramDef, async (ps, _user, token) => { } await userDenormalizedCache.set(data.id, data); } - if (Object.keys(profileUpdates).length > 0) + + if (Object.keys(profileUpdates).length > 0) { await UserProfiles.update(user.id, profileUpdates); + if (profileUpdates.mutedInstances) { + const cache = await InstanceMutingsCache.init(user.id); + await cache.clear(); + await cache.add(...profileUpdates.mutedInstances); + } + } + const iObj = await Users.pack(user.id, user, { detail: true, includeSecrets: isSecure, diff --git a/packages/backend/src/server/api/endpoints/mute/create.ts b/packages/backend/src/server/api/endpoints/mute/create.ts index 7b2f109053..eb79a7f314 100644 --- a/packages/backend/src/server/api/endpoints/mute/create.ts +++ b/packages/backend/src/server/api/endpoints/mute/create.ts @@ -5,6 +5,7 @@ import { genId } from "@/misc/gen-id.js"; import { Mutings, NoteWatchings } from "@/models/index.js"; import type { Muting } from "@/models/entities/muting.js"; import { publishUserEvent } from "@/services/stream.js"; +import { UserMutingsCache } from "@/misc/cache.js"; export const meta = { tags: ["account"], @@ -64,12 +65,8 @@ export default define(meta, paramDef, async (ps, user) => { }); // Check if already muting - const exist = await Mutings.exist({ - where: { - muterId: muter.id, - muteeId: mutee.id, - }, - }); + const cache = await UserMutingsCache.init(muter.id); + const exist = await cache.isMuting(mutee.id); if (exist) { throw new ApiError(meta.errors.alreadyMuting); @@ -79,14 +76,17 @@ export default define(meta, paramDef, async (ps, user) => { return; } + const expiresAt = ps.expiresAt ? new Date(ps.expiresAt) : null; + // Create mute await Mutings.insert({ id: genId(), createdAt: new Date(), - expiresAt: ps.expiresAt ? new Date(ps.expiresAt) : null, + expiresAt, muterId: muter.id, muteeId: mutee.id, } as Muting); + await cache.mute(mutee.id, expiresAt); publishUserEvent(user.id, "mute", mutee); diff --git a/packages/backend/src/server/api/endpoints/mute/delete.ts b/packages/backend/src/server/api/endpoints/mute/delete.ts index cd00c1a8ab..470dedc035 100644 --- a/packages/backend/src/server/api/endpoints/mute/delete.ts +++ b/packages/backend/src/server/api/endpoints/mute/delete.ts @@ -3,6 +3,7 @@ import { ApiError } from "../../error.js"; import { getUser } from "../../common/getters.js"; import { Mutings } from "@/models/index.js"; import { publishUserEvent } from "@/services/stream.js"; +import { UserMutingsCache } from "@/misc/cache.js"; export const meta = { tags: ["account"], @@ -56,19 +57,19 @@ export default define(meta, paramDef, async (ps, user) => { }); // Check not muting - const muting = await Mutings.findOneBy({ - muterId: muter.id, - muteeId: mutee.id, - }); + const cache = await UserMutingsCache.init(muter.id); + const muting = await cache.isMuting(mutee.id); - if (muting == null) { + if (!muting) { throw new ApiError(meta.errors.notMuting); } // Delete mute await Mutings.delete({ - id: muting.id, + muterId: muter.id, + muteeId: mutee.id, }); + await cache.unmute(mutee.id); publishUserEvent(user.id, "unmute", mutee); }); diff --git a/packages/backend/src/server/api/endpoints/notes/timeline.ts b/packages/backend/src/server/api/endpoints/notes/timeline.ts index 93d868684a..ff0fd660d3 100644 --- a/packages/backend/src/server/api/endpoints/notes/timeline.ts +++ b/packages/backend/src/server/api/endpoints/notes/timeline.ts @@ -18,6 +18,7 @@ import { filterReply, filterVisibility, execTimelineQuery, + filterMutedUser, } from "@/db/scylla.js"; import { ChannelFollowingsCache, LocalFollowingsCache } from "@/misc/cache.js"; @@ -86,6 +87,7 @@ export default define(meta, paramDef, async (ps, user) => { filtered = await filterChannel(filtered, user, followingChannelIds); filtered = await filterReply(filtered, ps.withReplies, user); filtered = await filterVisibility(filtered, user, followingUserIds); + filtered = await filterMutedUser(filtered, user); return filtered; }; @@ -95,7 +97,7 @@ export default define(meta, paramDef, async (ps, user) => { }); } - const hasFollowing = await followingsCache.hasFollowing(); + const hasFollowing = await followingsCache.exists(); //#region Construct query const followingQuery = Followings.createQueryBuilder("following") diff --git a/packages/backend/src/services/following/create.ts b/packages/backend/src/services/following/create.ts index 8a5b68ab90..41a35eeeb0 100644 --- a/packages/backend/src/services/following/create.ts +++ b/packages/backend/src/services/following/create.ts @@ -85,7 +85,7 @@ export async function insertFollowingDoc( if (Users.isLocalUser(follower)) { // Cache following ID set const cache = await LocalFollowingsCache.init(follower.id); - await cache.follow(followee.id); + await cache.add(followee.id); } const req = await FollowRequests.findOneBy({ diff --git a/packages/backend/src/services/following/delete.ts b/packages/backend/src/services/following/delete.ts index 1ba874b207..98158307af 100644 --- a/packages/backend/src/services/following/delete.ts +++ b/packages/backend/src/services/following/delete.ts @@ -48,7 +48,7 @@ export default async function ( if (Users.isLocalUser(follower)) { const cache = await LocalFollowingsCache.init(follower.id); - await cache.unfollow(followee.id); + await cache.delete(followee.id); } decrementFollowing(follower, followee); diff --git a/packages/backend/src/services/following/reject.ts b/packages/backend/src/services/following/reject.ts index ca08a26e84..a195796704 100644 --- a/packages/backend/src/services/following/reject.ts +++ b/packages/backend/src/services/following/reject.ts @@ -94,7 +94,7 @@ async function removeFollow(followee: Both, follower: Both) { await Followings.delete(following.id); if (Users.isLocalUser(follower)) { const cache = await LocalFollowingsCache.init(follower.id); - await cache.unfollow(followee.id); + await cache.delete(followee.id); } decrementFollowing(follower, followee); }