perf (backend): store antenna cache in memory
This commit is contained in:
parent
7e2493b257
commit
4390dfcbfb
9 changed files with 54 additions and 22 deletions
1
packages/backend-rs/index.d.ts
vendored
1
packages/backend-rs/index.d.ts
vendored
|
@ -1352,6 +1352,7 @@ export interface Webhook {
|
||||||
latestStatus: number | null
|
latestStatus: number | null
|
||||||
}
|
}
|
||||||
export function updateAntennasOnNewNote(note: Note, noteAuthor: Acct, noteMutedUsers: Array<string>): Promise<void>
|
export function updateAntennasOnNewNote(note: Note, noteAuthor: Acct, noteMutedUsers: Array<string>): Promise<void>
|
||||||
|
export function updateAntennaCache(): Promise<void>
|
||||||
export function watchNote(watcherId: string, noteAuthorId: string, noteId: string): Promise<void>
|
export function watchNote(watcherId: string, noteAuthorId: string, noteId: string): Promise<void>
|
||||||
export function unwatchNote(watcherId: string, noteId: string): Promise<void>
|
export function unwatchNote(watcherId: string, noteId: string): Promise<void>
|
||||||
export enum PushNotificationKind {
|
export enum PushNotificationKind {
|
||||||
|
|
|
@ -310,7 +310,7 @@ if (!nativeBinding) {
|
||||||
throw new Error(`Failed to load native binding`)
|
throw new Error(`Failed to load native binding`)
|
||||||
}
|
}
|
||||||
|
|
||||||
const { SECOND, MINUTE, HOUR, DAY, USER_ONLINE_THRESHOLD, USER_ACTIVE_THRESHOLD, FILE_TYPE_BROWSERSAFE, fetchMeta, updateMetaCache, metaToPugArgs, loadConfig, stringToAcct, acctToString, fetchNodeinfo, nodeinfo_2_1, nodeinfo_2_0, Protocol, Inbound, Outbound, greet, initializeRustLogger, showServerInfo, isBlockedServer, isSilencedServer, isAllowedServer, checkWordMute, getFullApAccount, isSelfHost, isSameOrigin, extractHost, toPuny, isUnicodeEmoji, sqlLikeEscape, safeForSql, formatMilliseconds, getImageSizeFromUrl, getNoteSummary, isQuote, isSafeUrl, latestVersion, toMastodonId, fromMastodonId, nyaify, hashPassword, verifyPassword, isOldPasswordAlgorithm, decodeReaction, countReactions, toDbReaction, removeOldAttestationChallenges, cpuInfo, cpuUsage, memoryUsage, storageUsage, AntennaSrc, DriveFileUsageHint, MutedNoteReason, NoteVisibility, NotificationType, PageVisibility, PollNoteVisibility, RelayStatus, UserEmojiModPerm, UserProfileFfvisibility, UserProfileMutingNotificationTypes, updateAntennasOnNewNote, watchNote, unwatchNote, PushNotificationKind, sendPushNotification, publishToChannelStream, publishToChatStream, ChatIndexEvent, publishToChatIndexStream, publishToBroadcastStream, publishToGroupChatStream, publishToModerationStream, ChatEvent, getTimestamp, genId, genIdAt, generateSecureRandomString, generateUserToken } = nativeBinding
|
const { SECOND, MINUTE, HOUR, DAY, USER_ONLINE_THRESHOLD, USER_ACTIVE_THRESHOLD, FILE_TYPE_BROWSERSAFE, fetchMeta, updateMetaCache, metaToPugArgs, loadConfig, stringToAcct, acctToString, fetchNodeinfo, nodeinfo_2_1, nodeinfo_2_0, Protocol, Inbound, Outbound, greet, initializeRustLogger, showServerInfo, isBlockedServer, isSilencedServer, isAllowedServer, checkWordMute, getFullApAccount, isSelfHost, isSameOrigin, extractHost, toPuny, isUnicodeEmoji, sqlLikeEscape, safeForSql, formatMilliseconds, getImageSizeFromUrl, getNoteSummary, isQuote, isSafeUrl, latestVersion, toMastodonId, fromMastodonId, nyaify, hashPassword, verifyPassword, isOldPasswordAlgorithm, decodeReaction, countReactions, toDbReaction, removeOldAttestationChallenges, cpuInfo, cpuUsage, memoryUsage, storageUsage, AntennaSrc, DriveFileUsageHint, MutedNoteReason, NoteVisibility, NotificationType, PageVisibility, PollNoteVisibility, RelayStatus, UserEmojiModPerm, UserProfileFfvisibility, UserProfileMutingNotificationTypes, updateAntennasOnNewNote, updateAntennaCache, watchNote, unwatchNote, PushNotificationKind, sendPushNotification, publishToChannelStream, publishToChatStream, ChatIndexEvent, publishToChatIndexStream, publishToBroadcastStream, publishToGroupChatStream, publishToModerationStream, ChatEvent, getTimestamp, genId, genIdAt, generateSecureRandomString, generateUserToken } = nativeBinding
|
||||||
|
|
||||||
module.exports.SECOND = SECOND
|
module.exports.SECOND = SECOND
|
||||||
module.exports.MINUTE = MINUTE
|
module.exports.MINUTE = MINUTE
|
||||||
|
@ -378,6 +378,7 @@ module.exports.UserEmojiModPerm = UserEmojiModPerm
|
||||||
module.exports.UserProfileFfvisibility = UserProfileFfvisibility
|
module.exports.UserProfileFfvisibility = UserProfileFfvisibility
|
||||||
module.exports.UserProfileMutingNotificationTypes = UserProfileMutingNotificationTypes
|
module.exports.UserProfileMutingNotificationTypes = UserProfileMutingNotificationTypes
|
||||||
module.exports.updateAntennasOnNewNote = updateAntennasOnNewNote
|
module.exports.updateAntennasOnNewNote = updateAntennasOnNewNote
|
||||||
|
module.exports.updateAntennaCache = updateAntennaCache
|
||||||
module.exports.watchNote = watchNote
|
module.exports.watchNote = watchNote
|
||||||
module.exports.unwatchNote = unwatchNote
|
module.exports.unwatchNote = unwatchNote
|
||||||
module.exports.PushNotificationKind = PushNotificationKind
|
module.exports.PushNotificationKind = PushNotificationKind
|
||||||
|
|
27
packages/backend-rs/src/service/antenna/cache.rs
Normal file
27
packages/backend-rs/src/service/antenna/cache.rs
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
//! In-memory antennas cache handler
|
||||||
|
|
||||||
|
use crate::{database::db_conn, model::entity::antenna};
|
||||||
|
use sea_orm::prelude::*;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
static CACHE: Mutex<Option<Vec<antenna::Model>>> = Mutex::new(None);
|
||||||
|
|
||||||
|
fn set(antennas: &[antenna::Model]) {
|
||||||
|
let _ = CACHE
|
||||||
|
.lock()
|
||||||
|
.map(|mut cache| *cache = Some(antennas.to_owned()));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn update() -> Result<Vec<antenna::Model>, DbErr> {
|
||||||
|
tracing::debug!("updating cache");
|
||||||
|
let antennas = antenna::Entity::find().all(db_conn().await?).await?;
|
||||||
|
set(&antennas);
|
||||||
|
Ok(antennas)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn get() -> Result<Vec<antenna::Model>, DbErr> {
|
||||||
|
if let Some(cache) = CACHE.lock().ok().and_then(|cache| cache.clone()) {
|
||||||
|
return Ok(cache);
|
||||||
|
}
|
||||||
|
update().await
|
||||||
|
}
|
|
@ -1,2 +1,4 @@
|
||||||
|
mod cache;
|
||||||
mod check_hit;
|
mod check_hit;
|
||||||
pub mod process_new_note;
|
pub mod process_new_note;
|
||||||
|
pub mod update;
|
||||||
|
|
|
@ -1,10 +1,13 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
database::{cache, db_conn, redis_conn, redis_key, RedisConnError},
|
database::{cache, redis_conn, redis_key, RedisConnError},
|
||||||
federation::acct::Acct,
|
federation::acct::Acct,
|
||||||
misc::get_note_all_texts::{all_texts, PartialNoteToElaborate},
|
misc::get_note_all_texts::{all_texts, PartialNoteToElaborate},
|
||||||
model::entity::{antenna, note},
|
model::entity::note,
|
||||||
service::antenna::check_hit::{check_hit_antenna, AntennaCheckError},
|
service::{
|
||||||
service::stream,
|
antenna,
|
||||||
|
antenna::check_hit::{check_hit_antenna, AntennaCheckError},
|
||||||
|
stream,
|
||||||
|
},
|
||||||
util::id::{get_timestamp, InvalidIdError},
|
util::id::{get_timestamp, InvalidIdError},
|
||||||
};
|
};
|
||||||
use redis::{streams::StreamMaxlen, AsyncCommands, RedisError};
|
use redis::{streams::StreamMaxlen, AsyncCommands, RedisError};
|
||||||
|
@ -32,20 +35,6 @@ pub enum Error {
|
||||||
// https://github.com/napi-rs/napi-rs/issues/2060
|
// https://github.com/napi-rs/napi-rs/issues/2060
|
||||||
type Note = note::Model;
|
type Note = note::Model;
|
||||||
|
|
||||||
// TODO?: it might be better to store this directly in memory
|
|
||||||
// (like fetch_meta) instead of Redis as it's used so much
|
|
||||||
async fn antennas() -> Result<Vec<antenna::Model>, Error> {
|
|
||||||
const CACHE_KEY: &str = "antennas";
|
|
||||||
|
|
||||||
if let Some(antennas) = cache::get::<Vec<antenna::Model>>(CACHE_KEY).await? {
|
|
||||||
Ok(antennas)
|
|
||||||
} else {
|
|
||||||
let antennas = antenna::Entity::find().all(db_conn().await?).await?;
|
|
||||||
cache::set(CACHE_KEY, &antennas, 5 * 60).await?;
|
|
||||||
Ok(antennas)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[crate::export]
|
#[crate::export]
|
||||||
pub async fn update_antennas_on_new_note(
|
pub async fn update_antennas_on_new_note(
|
||||||
note: Note,
|
note: Note,
|
||||||
|
@ -67,7 +56,7 @@ pub async fn update_antennas_on_new_note(
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// TODO: do this in parallel
|
// TODO: do this in parallel
|
||||||
for antenna in antennas().await?.iter() {
|
for antenna in antenna::cache::get().await?.iter() {
|
||||||
if note_muted_users.contains(&antenna.user_id) {
|
if note_muted_users.contains(&antenna.user_id) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
7
packages/backend-rs/src/service/antenna/update.rs
Normal file
7
packages/backend-rs/src/service/antenna/update.rs
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
//! This module is (currently) used in the TypeScript backend only.
|
||||||
|
|
||||||
|
#[crate::ts_export]
|
||||||
|
pub async fn update_antenna_cache() -> Result<(), sea_orm::DbErr> {
|
||||||
|
super::cache::update().await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -1,5 +1,5 @@
|
||||||
import define from "@/server/api/define.js";
|
import define from "@/server/api/define.js";
|
||||||
import { fetchMeta, genIdAt } from "backend-rs";
|
import { fetchMeta, genIdAt, updateAntennaCache } from "backend-rs";
|
||||||
import { Antennas, UserLists, UserGroupJoinings } from "@/models/index.js";
|
import { Antennas, UserLists, UserGroupJoinings } from "@/models/index.js";
|
||||||
import { ApiError } from "@/server/api/error.js";
|
import { ApiError } from "@/server/api/error.js";
|
||||||
import { publishInternalEvent } from "@/services/stream.js";
|
import { publishInternalEvent } from "@/services/stream.js";
|
||||||
|
@ -173,6 +173,7 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
}).then((x) => Antennas.findOneByOrFail(x.identifiers[0]));
|
}).then((x) => Antennas.findOneByOrFail(x.identifiers[0]));
|
||||||
|
|
||||||
publishInternalEvent("antennaCreated", antenna);
|
publishInternalEvent("antennaCreated", antenna);
|
||||||
|
await updateAntennaCache();
|
||||||
|
|
||||||
return await Antennas.pack(antenna);
|
return await Antennas.pack(antenna);
|
||||||
});
|
});
|
||||||
|
|
|
@ -2,6 +2,7 @@ import define from "@/server/api/define.js";
|
||||||
import { ApiError } from "@/server/api/error.js";
|
import { ApiError } from "@/server/api/error.js";
|
||||||
import { Antennas } from "@/models/index.js";
|
import { Antennas } from "@/models/index.js";
|
||||||
import { publishInternalEvent } from "@/services/stream.js";
|
import { publishInternalEvent } from "@/services/stream.js";
|
||||||
|
import { updateAntennaCache } from "backend-rs";
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
tags: ["antennas"],
|
tags: ["antennas"],
|
||||||
|
@ -40,4 +41,5 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
await Antennas.delete(antenna.id);
|
await Antennas.delete(antenna.id);
|
||||||
|
|
||||||
publishInternalEvent("antennaDeleted", antenna);
|
publishInternalEvent("antennaDeleted", antenna);
|
||||||
|
await updateAntennaCache();
|
||||||
});
|
});
|
||||||
|
|
|
@ -2,6 +2,7 @@ import define from "@/server/api/define.js";
|
||||||
import { ApiError } from "@/server/api/error.js";
|
import { ApiError } from "@/server/api/error.js";
|
||||||
import { Antennas, UserLists, UserGroupJoinings } from "@/models/index.js";
|
import { Antennas, UserLists, UserGroupJoinings } from "@/models/index.js";
|
||||||
import { publishInternalEvent } from "@/services/stream.js";
|
import { publishInternalEvent } from "@/services/stream.js";
|
||||||
|
import { updateAntennaCache } from "backend-rs";
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
tags: ["antennas"],
|
tags: ["antennas"],
|
||||||
|
@ -166,6 +167,7 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
"antennaUpdated",
|
"antennaUpdated",
|
||||||
await Antennas.findOneByOrFail({ id: antenna.id }),
|
await Antennas.findOneByOrFail({ id: antenna.id }),
|
||||||
);
|
);
|
||||||
|
await updateAntennaCache();
|
||||||
|
|
||||||
return await Antennas.pack(antenna.id);
|
return await Antennas.pack(antenna.id);
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in a new issue