store cache values to redis

This commit is contained in:
Namekuji 2023-07-02 20:37:46 -04:00
parent 37f08826d1
commit 947163fde2
No known key found for this signature in database
GPG key ID: 1D62332C07FBA532
11 changed files with 131 additions and 88 deletions

View file

@ -34,6 +34,7 @@
"@koa/cors": "3.4.3",
"@koa/multer": "3.0.2",
"@koa/router": "9.0.1",
"@msgpack/msgpack": "3.0.0-beta2",
"@peertube/http-signature": "1.7.0",
"@redocly/openapi-core": "1.0.0-beta.120",
"@sinonjs/fake-timers": "9.1.2",
@ -43,7 +44,6 @@
"ajv": "8.12.0",
"archiver": "5.3.1",
"argon2": "^0.30.3",
"async-mutex": "^0.4.0",
"autobind-decorator": "2.4.0",
"autolinker": "4.0.0",
"autwh": "0.1.0",

View file

@ -1,43 +1,75 @@
import { redisClient } from "@/db/redis.js";
import { nativeRandomStr } from "native-utils/built/index.js";
import { encode, decode } from "@msgpack/msgpack";
import { ChainableCommander } from "ioredis";
export class Cache<T> {
public cache: Map<string | null, { date: number; value: T }>;
private lifetime: number;
private ttl: number;
private fingerprint: string;
constructor(lifetime: Cache<never>["lifetime"]) {
this.cache = new Map();
this.lifetime = lifetime;
constructor(ttl: number) {
this.ttl = ttl;
this.fingerprint = `cache:${nativeRandomStr(32)}`;
}
public set(key: string | null, value: T): void {
this.cache.set(key, {
date: Date.now(),
value,
});
private prefixedKey(key: string | null): string {
return key ? `${this.fingerprint}:${key}` : this.fingerprint;
}
public get(key: string | null): T | undefined {
const cached = this.cache.get(key);
if (cached == null) return undefined;
if (Date.now() - cached.date > this.lifetime) {
this.cache.delete(key);
return undefined;
public async set(key: string | null, value: T, transaction?: ChainableCommander): Promise<void> {
const _key = this.prefixedKey(key);
const _value = Buffer.from(encode(value));
const commander = transaction ?? redisClient;
if (this.ttl === Infinity) {
await commander.set(_key, _value);
} else {
await commander.set(_key, _value, "PX", this.ttl);
}
return cached.value;
}
public delete(key: string | null) {
this.cache.delete(key);
public async get(key: string | null): Promise<T | undefined> {
const _key = this.prefixedKey(key);
const cached = await redisClient.getBuffer(_key);
if (cached === null) return undefined;
return decode(cached) as T;
}
public async getAll(): Promise<Map<string, T>> {
const keys = await redisClient.keys(`${this.fingerprint}*`);
const map = new Map<string, T>();
if (keys.length === 0) {
return map;
}
const values = await redisClient.mgetBuffer(keys);
for (const [i, key] of keys.entries()) {
const val = values[i];
if (val !== null) {
map.set(key, decode(val) as T);
}
}
return map;
}
public async delete(...keys: (string | null)[]): Promise<void> {
if (keys.length > 0) {
const _keys = keys.map(this.prefixedKey);
await redisClient.del(_keys);
}
}
/**
* fetcherを呼び出して結果をキャッシュ&
* optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします
* Returns if cached value exists. Otherwise, calls fetcher and caches.
* Overwrites cached value if invalidated by the optional validator.
*/
public async fetch(
key: string | null,
fetcher: () => Promise<T>,
validator?: (cachedValue: T) => boolean,
): Promise<T> {
const cachedValue = this.get(key);
const cachedValue = await this.get(key);
if (cachedValue !== undefined) {
if (validator) {
if (validator(cachedValue)) {
@ -52,20 +84,20 @@ export class Cache<T> {
// Cache MISS
const value = await fetcher();
this.set(key, value);
await this.set(key, value);
return value;
}
/**
* fetcherを呼び出して結果をキャッシュ&
* optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします
* Returns if cached value exists. Otherwise, calls fetcher and caches if the fetcher returns a value.
* Overwrites cached value if invalidated by the optional validator.
*/
public async fetchMaybe(
key: string | null,
fetcher: () => Promise<T | undefined>,
validator?: (cachedValue: T) => boolean,
): Promise<T | undefined> {
const cachedValue = this.get(key);
const cachedValue = await this.get(key);
if (cachedValue !== undefined) {
if (validator) {
if (validator(cachedValue)) {
@ -81,7 +113,7 @@ export class Cache<T> {
// Cache MISS
const value = await fetcher();
if (value !== undefined) {
this.set(key, value);
await this.set(key, value);
}
return value;
}

View file

@ -1,9 +1,10 @@
import probeImageSize from "probe-image-size";
import { Mutex, withTimeout } from "async-mutex";
import { Mutex } from "redis-semaphore";
import { FILE_TYPE_BROWSERSAFE } from "@/const.js";
import Logger from "@/services/logger.js";
import { Cache } from "./cache.js";
import { redisClient } from "@/db/redis.js";
export type Size = {
width: number;
@ -11,23 +12,30 @@ export type Size = {
};
const cache = new Cache<boolean>(1000 * 60 * 10); // once every 10 minutes for the same url
const mutex = withTimeout(new Mutex(), 1000);
const logger = new Logger("emoji");
export async function getEmojiSize(url: string): Promise<Size> {
const logger = new Logger("emoji");
let attempted = true;
await mutex.runExclusive(() => {
const attempted = cache.get(url);
const lock = new Mutex(redisClient, "getEmojiSize");
await lock.acquire();
try {
attempted = (await cache.get(url)) === true;
if (!attempted) {
cache.set(url, true);
} else {
await cache.set(url, true);
}
} finally {
await lock.release();
}
if (attempted) {
logger.warn(`Attempt limit exceeded: ${url}`);
throw new Error("Too many attempts");
}
});
try {
logger.info(`Retrieving emoji size from ${url}`);
logger.debug(`Retrieving emoji size from ${url}`);
const { width, height, mime } = await probeImageSize(url, {
timeout: 5000,
});

View file

@ -7,6 +7,7 @@ import { isSelfHost, toPunyNullable } from "./convert-host.js";
import { decodeReaction } from "./reaction-lib.js";
import config from "@/config/index.js";
import { query } from "@/prelude/url.js";
import { redisClient } from "@/db/redis.js";
const cache = new Cache<Emoji | null>(1000 * 60 * 60 * 12);
@ -75,7 +76,7 @@ export async function populateEmoji(
if (emoji && !(emoji.width && emoji.height)) {
emoji = await queryOrNull();
cache.set(cacheKey, emoji);
await cache.set(cacheKey, emoji);
}
if (emoji == null) return null;
@ -150,7 +151,7 @@ export async function prefetchEmojis(
emojis: { name: string; host: string | null }[],
): Promise<void> {
const notCachedEmojis = emojis.filter(
(emoji) => cache.get(`${emoji.name} ${emoji.host}`) == null,
async (emoji) => !(await cache.get(`${emoji.name} ${emoji.host}`)),
);
const emojisQuery: any[] = [];
const hosts = new Set(notCachedEmojis.map((e) => e.host));
@ -169,7 +170,9 @@ export async function prefetchEmojis(
select: ["name", "host", "originalUrl", "publicUrl"],
})
: [];
const trans = redisClient.multi();
for (const emoji of _emojis) {
cache.set(`${emoji.name} ${emoji.host}`, emoji);
cache.set(`${emoji.name} ${emoji.host}`, emoji, trans);
}
await trans.exec();
}

View file

@ -135,14 +135,14 @@ export async function fetchPerson(
): Promise<CacheableUser | null> {
if (typeof uri !== "string") throw new Error("uri is not string");
const cached = uriPersonCache.get(uri);
const cached = await uriPersonCache.get(uri);
if (cached) return cached;
// Fetch from the database if the URI points to this server
if (uri.startsWith(`${config.url}/`)) {
const id = uri.split("/").pop();
const u = await Users.findOneBy({ id });
if (u) uriPersonCache.set(uri, u);
if (u) await uriPersonCache.set(uri, u);
return u;
}
@ -150,7 +150,7 @@ export async function fetchPerson(
const exist = await Users.findOneBy({ uri });
if (exist) {
uriPersonCache.set(uri, exist);
await uriPersonCache.set(uri, exist);
return exist;
}
//#endregion

View file

@ -25,12 +25,12 @@ export default class ActiveUsersChart extends Chart<typeof schema> {
return {};
}
public async read(user: {
public read(user: {
id: User["id"];
host: null;
createdAt: User["createdAt"];
}): Promise<void> {
await this.commit({
}) {
this.commit({
read: [user.id],
registeredWithinWeek:
Date.now() - user.createdAt.getTime() < week ? [user.id] : [],

View file

@ -9,7 +9,7 @@ const ACTOR_USERNAME = "instance.actor" as const;
const cache = new Cache<ILocalUser>(Infinity);
export async function getInstanceActor(): Promise<ILocalUser> {
const cached = cache.get(null);
const cached = await cache.get(null);
if (cached) return cached;
const user = (await Users.findOneBy({
@ -18,11 +18,11 @@ export async function getInstanceActor(): Promise<ILocalUser> {
})) as ILocalUser | undefined;
if (user) {
cache.set(null, user);
await cache.set(null, user);
return user;
} else {
const created = (await createSystemUser(ACTOR_USERNAME)) as ILocalUser;
cache.set(null, created);
await cache.set(null, created);
return created;
}
}

View file

@ -9,25 +9,25 @@ const cache = new Cache<Instance>(1000 * 60 * 60);
export async function registerOrFetchInstanceDoc(
host: string,
): Promise<Instance> {
host = toPuny(host);
const _host = toPuny(host);
const cached = cache.get(host);
const cached = await cache.get(_host);
if (cached) return cached;
const index = await Instances.findOneBy({ host });
const index = await Instances.findOneBy({ host: _host });
if (index == null) {
const i = await Instances.insert({
id: genId(),
host,
host: _host,
caughtAt: new Date(),
lastCommunicatedAt: new Date(),
}).then((x) => Instances.findOneByOrFail(x.identifiers[0]));
cache.set(host, i);
await cache.set(_host, i);
return i;
} else {
cache.set(host, index);
await cache.set(_host, index);
return index;
}
}

View file

@ -90,7 +90,7 @@ async function updateRelaysCache() {
const relays = await Relays.findBy({
status: "accepted",
});
relaysCache.set(null, relays);
await relaysCache.set(null, relays);
}
export async function relayRejected(id: string) {

View file

@ -6,7 +6,7 @@ import type {
import { User } from "@/models/entities/user.js";
import { Users } from "@/models/index.js";
import { Cache } from "@/misc/cache.js";
import { subscriber } from "@/db/redis.js";
import { redisClient, subscriber } from "@/db/redis.js";
export const userByIdCache = new Cache<CacheableUser>(Infinity);
export const localUserByNativeTokenCache = new Cache<CacheableLocalUser | null>(
@ -22,13 +22,12 @@ subscriber.on("message", async (_, data) => {
const { type, body } = obj.message;
switch (type) {
case "localUserUpdated": {
userByIdCache.delete(body.id);
localUserByIdCache.delete(body.id);
localUserByNativeTokenCache.cache.forEach((v, k) => {
if (v.value?.id === body.id) {
localUserByNativeTokenCache.delete(k);
}
});
await userByIdCache.delete(body.id);
await localUserByIdCache.delete(body.id);
const toDelete = Array.from(await localUserByNativeTokenCache.getAll())
.filter((v) => v[1]?.id === body.id)
.map((v) => v[0]);
await localUserByNativeTokenCache.delete(...toDelete);
break;
}
case "userChangeSuspendedState":
@ -36,15 +35,17 @@ subscriber.on("message", async (_, data) => {
case "userChangeModeratorState":
case "remoteUserUpdated": {
const user = await Users.findOneByOrFail({ id: body.id });
userByIdCache.set(user.id, user);
for (const [k, v] of uriPersonCache.cache.entries()) {
if (v.value?.id === user.id) {
uriPersonCache.set(k, user);
await userByIdCache.set(user.id, user);
const trans = redisClient.multi();
for (const [k, v] of (await uriPersonCache.getAll()).entries()) {
if (v?.id === user.id) {
await uriPersonCache.set(k, user, trans);
}
}
await trans.exec();
if (Users.isLocalUser(user)) {
localUserByNativeTokenCache.set(user.token, user);
localUserByIdCache.set(user.id, user);
await localUserByNativeTokenCache.set(user.token, user);
await localUserByIdCache.set(user.id, user);
}
break;
}
@ -52,8 +53,8 @@ subscriber.on("message", async (_, data) => {
const user = (await Users.findOneByOrFail({
id: body.id,
})) as ILocalUser;
localUserByNativeTokenCache.delete(body.oldToken);
localUserByNativeTokenCache.set(body.newToken, user);
await localUserByNativeTokenCache.delete(body.oldToken);
await localUserByNativeTokenCache.set(body.newToken, user);
break;
}
default:

View file

@ -105,6 +105,9 @@ importers:
'@koa/router':
specifier: 9.0.1
version: 9.0.1
'@msgpack/msgpack':
specifier: 3.0.0-beta2
version: 3.0.0-beta2
'@peertube/http-signature':
specifier: 1.7.0
version: 1.7.0
@ -132,9 +135,6 @@ importers:
argon2:
specifier: ^0.30.3
version: 0.30.3
async-mutex:
specifier: ^0.4.0
version: 0.4.0
autobind-decorator:
specifier: 2.4.0
version: 2.4.0
@ -786,7 +786,7 @@ importers:
version: 2.30.0
emojilib:
specifier: github:thatonecalculator/emojilib
version: github.com/thatonecalculator/emojilib/542fcc1a25003afad78f3248ceee8ac6980ddeb8
version: github.com/thatonecalculator/emojilib/06944984a61ee799b7083894258f5fa318d932d1
escape-regexp:
specifier: 0.0.1
version: 0.0.1
@ -2277,6 +2277,11 @@ packages:
os-filter-obj: 2.0.0
dev: true
/@msgpack/msgpack@3.0.0-beta2:
resolution: {integrity: sha512-y+l1PNV0XDyY8sM3YtuMLK5vE3/hkfId+Do8pLo/OPxfxuFAUwcGz3oiiUuV46/aBpwTzZ+mRWVMtlSKbradhw==}
engines: {node: '>= 14'}
dev: false
/@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.2:
resolution: {integrity: sha512-9bfjwDxIDWmmOKusUcqdS4Rw+SETlp9Dy39Xui9BEGEk19dDwH0jhipwFzEff/pFg95NKymc6TOTbRKcWeRqyQ==}
cpu: [arm64]
@ -4496,12 +4501,6 @@ packages:
stream-exhaust: 1.0.2
dev: true
/async-mutex@0.4.0:
resolution: {integrity: sha512-eJFZ1YhRR8UN8eBLoNzcDPcy/jqjsg6I1AP+KvWQX80BqOSW1oJPJXDylPUEeMr2ZQvHgnQ//Lp6f3RQ1zI7HA==}
dependencies:
tslib: 2.6.0
dev: false
/async-settle@1.0.0:
resolution: {integrity: sha512-VPXfB4Vk49z1LHHodrEQ6Xf7W4gg1w0dAPROHngx7qgDjqmIQ+fXmwgGXTW/ITLai0YLSvWepJOP9EVpMnEAcw==}
engines: {node: '>= 0.10'}
@ -15772,8 +15771,8 @@ packages:
url-polyfill: 1.1.12
dev: true
github.com/thatonecalculator/emojilib/542fcc1a25003afad78f3248ceee8ac6980ddeb8:
resolution: {tarball: https://codeload.github.com/thatonecalculator/emojilib/tar.gz/542fcc1a25003afad78f3248ceee8ac6980ddeb8}
github.com/thatonecalculator/emojilib/06944984a61ee799b7083894258f5fa318d932d1:
resolution: {tarball: https://codeload.github.com/thatonecalculator/emojilib/tar.gz/06944984a61ee799b7083894258f5fa318d932d1}
name: emojilib
version: 3.0.10
dev: true