perf: cache following channels
This commit is contained in:
parent
af70257c6d
commit
d89e24f796
4 changed files with 89 additions and 20 deletions
|
@ -1,7 +1,7 @@
|
||||||
import { redisClient } from "@/db/redis.js";
|
import { redisClient } from "@/db/redis.js";
|
||||||
import { encode, decode } from "msgpackr";
|
import { encode, decode } from "msgpackr";
|
||||||
import { ChainableCommander } from "ioredis";
|
import { ChainableCommander } from "ioredis";
|
||||||
import { Followings } from "@/models/index.js";
|
import { ChannelFollowings, Followings } from "@/models/index.js";
|
||||||
import { IsNull } from "typeorm";
|
import { IsNull } from "typeorm";
|
||||||
|
|
||||||
export class Cache<T> {
|
export class Cache<T> {
|
||||||
|
@ -132,28 +132,29 @@ export class Cache<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export class LocalFollowingsCache {
|
class SetCache {
|
||||||
private myId: string;
|
|
||||||
private key: string;
|
private key: string;
|
||||||
|
private fetcher: () => Promise<string[]>;
|
||||||
|
|
||||||
private constructor(userId: string) {
|
protected constructor(
|
||||||
this.myId = userId;
|
name: string,
|
||||||
this.key = `follow:${userId}`;
|
userId: string,
|
||||||
|
fetcher: () => Promise<string[]>,
|
||||||
|
) {
|
||||||
|
this.key = `setcache:${name}:${userId}`;
|
||||||
|
this.fetcher = fetcher;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static async init(userId: string): Promise<LocalFollowingsCache> {
|
protected async fetch() {
|
||||||
const cache = new LocalFollowingsCache(userId);
|
// Sync from DB if nothing is cached yet or cache is expired
|
||||||
|
const ttlKey = `${this.key}:fetched`;
|
||||||
|
const fetched = await redisClient.exists(ttlKey);
|
||||||
|
|
||||||
// Sync from DB if no followings are cached
|
if (!(await this.hasFollowing()) || fetched === 0) {
|
||||||
if (!(await cache.hasFollowing())) {
|
await redisClient.del(this.key);
|
||||||
const rel = await Followings.find({
|
await this.follow(...(await this.fetcher()));
|
||||||
select: { followeeId: true },
|
await redisClient.set(ttlKey, "yes", "EX", 60 * 30);
|
||||||
where: { followerId: cache.myId, followerHost: IsNull() },
|
|
||||||
});
|
|
||||||
await cache.follow(...rel.map((r) => r.followeeId));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return cache;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async follow(...targetIds: string[]) {
|
public async follow(...targetIds: string[]) {
|
||||||
|
@ -182,3 +183,43 @@ export class LocalFollowingsCache {
|
||||||
return await redisClient.smembers(this.key);
|
return await redisClient.smembers(this.key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export class LocalFollowingsCache extends SetCache {
|
||||||
|
private constructor(userId: string) {
|
||||||
|
const fetcher = () =>
|
||||||
|
Followings.find({
|
||||||
|
select: { followeeId: true },
|
||||||
|
where: { followerId: userId, followerHost: IsNull() },
|
||||||
|
}).then((follows) => follows.map((follow) => follow.followeeId));
|
||||||
|
|
||||||
|
super("follow", userId, fetcher);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static async init(userId: string): Promise<LocalFollowingsCache> {
|
||||||
|
const cache = new LocalFollowingsCache(userId);
|
||||||
|
await cache.fetch();
|
||||||
|
|
||||||
|
return cache;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class ChannelFollowingsCache extends SetCache {
|
||||||
|
private constructor(userId: string) {
|
||||||
|
const fetcher = () =>
|
||||||
|
ChannelFollowings.find({
|
||||||
|
select: { followeeId: true },
|
||||||
|
where: {
|
||||||
|
followerId: userId,
|
||||||
|
},
|
||||||
|
}).then((follows) => follows.map((follow) => follow.followeeId));
|
||||||
|
|
||||||
|
super("channel", userId, fetcher);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static async init(userId: string): Promise<ChannelFollowingsCache> {
|
||||||
|
const cache = new ChannelFollowingsCache(userId);
|
||||||
|
await cache.fetch();
|
||||||
|
|
||||||
|
return cache;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -3,6 +3,8 @@ import { ApiError } from "../../error.js";
|
||||||
import { Channels, ChannelFollowings } from "@/models/index.js";
|
import { Channels, ChannelFollowings } from "@/models/index.js";
|
||||||
import { genId } from "@/misc/gen-id.js";
|
import { genId } from "@/misc/gen-id.js";
|
||||||
import { publishUserEvent } from "@/services/stream.js";
|
import { publishUserEvent } from "@/services/stream.js";
|
||||||
|
import { ChannelFollowingsCache } from "@/misc/cache.js";
|
||||||
|
import { scyllaClient } from "@/db/scylla.js";
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
tags: ["channels"],
|
tags: ["channels"],
|
||||||
|
@ -44,5 +46,10 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
followeeId: channel.id,
|
followeeId: channel.id,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (scyllaClient) {
|
||||||
|
const cache = await ChannelFollowingsCache.init(user.id);
|
||||||
|
await cache.follow(channel.id);
|
||||||
|
}
|
||||||
|
|
||||||
publishUserEvent(user.id, "followChannel", channel);
|
publishUserEvent(user.id, "followChannel", channel);
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
|
import { ChannelFollowingsCache } from "@/misc/cache.js";
|
||||||
import define from "../../define.js";
|
import define from "../../define.js";
|
||||||
import { ApiError } from "../../error.js";
|
import { ApiError } from "../../error.js";
|
||||||
import { Channels, ChannelFollowings } from "@/models/index.js";
|
import { Channels, ChannelFollowings } from "@/models/index.js";
|
||||||
import { publishUserEvent } from "@/services/stream.js";
|
import { publishUserEvent } from "@/services/stream.js";
|
||||||
|
import { scyllaClient } from "@/db/scylla.js";
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
tags: ["channels"],
|
tags: ["channels"],
|
||||||
|
@ -41,5 +43,10 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
followeeId: channel.id,
|
followeeId: channel.id,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (scyllaClient) {
|
||||||
|
const cache = await ChannelFollowingsCache.init(user.id);
|
||||||
|
await cache.unfollow(channel.id);
|
||||||
|
}
|
||||||
|
|
||||||
publishUserEvent(user.id, "unfollowChannel", channel);
|
publishUserEvent(user.id, "unfollowChannel", channel);
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import { Brackets } from "typeorm";
|
import { Brackets } from "typeorm";
|
||||||
import { Notes, Followings } from "@/models/index.js";
|
import { Notes, Followings, ChannelFollowings } from "@/models/index.js";
|
||||||
import { activeUsersChart } from "@/services/chart/index.js";
|
import { activeUsersChart } from "@/services/chart/index.js";
|
||||||
import define from "../../define.js";
|
import define from "../../define.js";
|
||||||
import { makePaginationQuery } from "../../common/make-pagination-query.js";
|
import { makePaginationQuery } from "../../common/make-pagination-query.js";
|
||||||
|
@ -17,7 +17,7 @@ import {
|
||||||
prepared,
|
prepared,
|
||||||
scyllaClient,
|
scyllaClient,
|
||||||
} from "@/db/scylla.js";
|
} from "@/db/scylla.js";
|
||||||
import { LocalFollowingsCache } from "@/misc/cache.js";
|
import { ChannelFollowingsCache, LocalFollowingsCache } from "@/misc/cache.js";
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
tags: ["notes"],
|
tags: ["notes"],
|
||||||
|
@ -75,7 +75,7 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
|
|
||||||
if (scyllaClient) {
|
if (scyllaClient) {
|
||||||
let untilDate = new Date();
|
let untilDate = new Date();
|
||||||
const foundNotes: ScyllaNote[] = [];
|
let foundNotes: ScyllaNote[] = [];
|
||||||
const validIds = [user.id].concat(await followingsCache.getAll());
|
const validIds = [user.id].concat(await followingsCache.getAll());
|
||||||
|
|
||||||
while (foundNotes.length < ps.limit) {
|
while (foundNotes.length < ps.limit) {
|
||||||
|
@ -102,6 +102,20 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
untilDate = notes[notes.length - 1].createdAt;
|
untilDate = notes[notes.length - 1].createdAt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Filter channels
|
||||||
|
if (!user) {
|
||||||
|
foundNotes = foundNotes.filter((note) => !note.channelId);
|
||||||
|
} else {
|
||||||
|
const channelNotes = foundNotes.filter((note) => !!note.channelId);
|
||||||
|
if (channelNotes.length > 0) {
|
||||||
|
const cache = await ChannelFollowingsCache.init(user.id);
|
||||||
|
const followingIds = await cache.getAll();
|
||||||
|
foundNotes = foundNotes.filter(
|
||||||
|
(note) => !note.channelId || followingIds.includes(note.channelId),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return Notes.packMany(foundNotes, user);
|
return Notes.packMany(foundNotes, user);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue