From 61562a094315367f6d63fd697b25216e4327feae Mon Sep 17 00:00:00 2001 From: naskya Date: Sat, 18 May 2024 07:36:52 +0900 Subject: [PATCH] refactor (backend-rs): use async redis commands --- Cargo.lock | 37 +++++ Cargo.toml | 1 + packages/backend-rs/Cargo.toml | 3 +- packages/backend-rs/index.d.ts | 12 +- packages/backend-rs/src/database/cache.rs | 134 ++++++++++-------- packages/backend-rs/src/database/redis.rs | 33 +++-- .../backend-rs/src/misc/check_hit_antenna.rs | 40 +++--- .../backend-rs/src/misc/get_image_size.rs | 12 +- .../backend-rs/src/misc/latest_version.rs | 9 +- packages/backend-rs/src/service/antenna.rs | 27 ++-- .../src/service/nodeinfo/generate.rs | 4 +- packages/backend-rs/src/service/stream.rs | 15 +- .../backend-rs/src/service/stream/antenna.rs | 3 +- .../backend-rs/src/service/stream/channel.rs | 3 +- .../backend-rs/src/service/stream/chat.rs | 3 +- .../src/service/stream/chat_index.rs | 3 +- .../src/service/stream/custom_emoji.rs | 3 +- .../src/service/stream/group_chat.rs | 7 +- .../src/service/stream/moderation.rs | 3 +- .../api/common/read-messaging-message.ts | 44 +++--- .../server/api/common/read-notification.ts | 4 +- .../server/api/endpoints/admin/emoji/add.ts | 7 +- .../server/api/endpoints/admin/emoji/copy.ts | 7 +- .../src/server/api/endpoints/i/known-as.ts | 2 +- .../src/server/api/endpoints/i/update.ts | 4 +- .../notifications/mark-all-as-read.ts | 6 +- .../api/endpoints/users/report-abuse.ts | 4 +- .../backend/src/server/api/stream/index.ts | 18 ++- .../src/services/create-notification.ts | 6 +- .../backend/src/services/messages/create.ts | 68 +++++---- .../backend/src/services/messages/delete.ts | 16 ++- 31 files changed, 331 insertions(+), 207 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7e1cf69a05..c162ec938d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -239,6 +239,7 @@ dependencies = [ "sysinfo", "thiserror", "tokio", + "tokio-test", "tracing", "tracing-subscriber", "url", @@ -564,7 +565,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" dependencies = [ "bytes", + "futures-core", "memchr", + "pin-project-lite", + "tokio", + "tokio-util", ] [[package]] @@ -2618,12 +2623,18 @@ version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6472825949c09872e8f2c50bde59fcefc17748b6be5c90fd67cd8b4daca73bfd" dependencies = [ + "async-trait", + "bytes", "combine", + "futures-util", "itoa", "percent-encoding", + "pin-project-lite", "ryu", "sha1_smol", "socket2", + "tokio", + "tokio-util", "url", ] @@ -3767,6 +3778,32 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + +[[package]] +name = "tokio-util" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml" version = "0.8.13" diff --git a/Cargo.toml b/Cargo.toml index e7220aa1ed..8a9e5485e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ syn = "2.0.64" sysinfo = "0.30.12" thiserror = "1.0.61" tokio = "1.37.0" +tokio-test = "0.4.4" tracing = "0.1.40" tracing-subscriber = "0.3.18" url = "2.5.0" diff --git a/packages/backend-rs/Cargo.toml b/packages/backend-rs/Cargo.toml index 0a307c1ef9..1c274e5ed7 100644 --- a/packages/backend-rs/Cargo.toml +++ b/packages/backend-rs/Cargo.toml @@ -31,7 +31,7 @@ nom-exif = { workspace = true } once_cell = { workspace = true } openssl = { workspace = true, features = ["vendored"] } rand = { workspace = true } -redis = { workspace = true } +redis = { workspace = true, features = ["tokio-comp"] } regex = { workspace = true } rmp-serde = { workspace = true } sea-orm = { workspace = true, features = ["sqlx-postgres", "runtime-tokio-rustls"] } @@ -50,6 +50,7 @@ web-push = { workspace = true } [dev-dependencies] pretty_assertions = { workspace = true } +tokio-test = { workspace = true } [build-dependencies] napi-build = { workspace = true } diff --git a/packages/backend-rs/index.d.ts b/packages/backend-rs/index.d.ts index 854a1f894b..7352414f20 100644 --- a/packages/backend-rs/index.d.ts +++ b/packages/backend-rs/index.d.ts @@ -1288,19 +1288,19 @@ export enum PushNotificationKind { ReadAllNotifications = 'readAllNotifications' } export function sendPushNotification(receiverUserId: string, kind: PushNotificationKind, content: any): Promise -export function publishToChannelStream(channelId: string, userId: string): void +export function publishToChannelStream(channelId: string, userId: string): Promise export enum ChatEvent { Message = 'message', Read = 'read', Deleted = 'deleted', Typing = 'typing' } -export function publishToChatStream(senderUserId: string, receiverUserId: string, kind: ChatEvent, object: any): void +export function publishToChatStream(senderUserId: string, receiverUserId: string, kind: ChatEvent, object: any): Promise export enum ChatIndexEvent { Message = 'message', Read = 'read' } -export function publishToChatIndexStream(userId: string, kind: ChatIndexEvent, object: any): void +export function publishToChatIndexStream(userId: string, kind: ChatIndexEvent, object: any): Promise export interface PackedEmoji { id: string aliases: Array @@ -1312,15 +1312,15 @@ export interface PackedEmoji { width: number | null height: number | null } -export function publishToBroadcastStream(emoji: PackedEmoji): void -export function publishToGroupChatStream(groupId: string, kind: ChatEvent, object: any): void +export function publishToBroadcastStream(emoji: PackedEmoji): Promise +export function publishToGroupChatStream(groupId: string, kind: ChatEvent, object: any): Promise export interface AbuseUserReportLike { id: string targetUserId: string reporterId: string comment: string } -export function publishToModerationStream(moderatorId: string, report: AbuseUserReportLike): void +export function publishToModerationStream(moderatorId: string, report: AbuseUserReportLike): Promise export function getTimestamp(id: string): number /** * The generated ID results in the form of `[8 chars timestamp] + [cuid2]`. diff --git a/packages/backend-rs/src/database/cache.rs b/packages/backend-rs/src/database/cache.rs index 6760004c2d..393cf419b2 100644 --- a/packages/backend-rs/src/database/cache.rs +++ b/packages/backend-rs/src/database/cache.rs @@ -1,5 +1,5 @@ use crate::database::{redis_conn, redis_key}; -use redis::{Commands, RedisError}; +use redis::{AsyncCommands, RedisError}; use serde::{Deserialize, Serialize}; #[derive(strum::Display, Debug)] @@ -54,26 +54,31 @@ fn wildcard(category: Category) -> String { /// /// ``` /// # use backend_rs::database::cache; +/// # tokio_test::block_on(async { /// let key = "apple"; /// let data = "I want to cache this string".to_string(); /// /// // caches the data for 10 seconds -/// cache::set(key, &data, 10); +/// cache::set(key, &data, 10).await; /// /// // get the cache -/// let cached_data = cache::get::(key).unwrap(); +/// let cached_data = cache::get::(key).await.unwrap(); /// assert_eq!(data, cached_data.unwrap()); +/// # }) /// ``` -pub fn set Deserialize<'a> + Serialize>( +pub async fn set Deserialize<'a> + Serialize>( key: &str, value: &V, expire_seconds: u64, ) -> Result<(), Error> { - redis_conn()?.set_ex( - prefix_key(key), - rmp_serde::encode::to_vec(&value)?, - expire_seconds, - )?; + redis_conn() + .await? + .set_ex( + prefix_key(key), + rmp_serde::encode::to_vec(&value)?, + expire_seconds, + ) + .await?; Ok(()) } @@ -90,22 +95,24 @@ pub fn set Deserialize<'a> + Serialize>( /// /// ``` /// # use backend_rs::database::cache; +/// # tokio_test::block_on(async { /// let key = "banana"; /// let data = "I want to cache this string".to_string(); /// /// // set cache -/// cache::set(key, &data, 10).unwrap(); +/// cache::set(key, &data, 10).await.unwrap(); /// /// // get cache -/// let cached_data = cache::get::(key).unwrap(); +/// let cached_data = cache::get::(key).await.unwrap(); /// assert_eq!(data, cached_data.unwrap()); /// /// // get nonexistent (or expired) cache -/// let no_cache = cache::get::("nonexistent").unwrap(); +/// let no_cache = cache::get::("nonexistent").await.unwrap(); /// assert!(no_cache.is_none()); +/// # }) /// ``` -pub fn get Deserialize<'a> + Serialize>(key: &str) -> Result, Error> { - let serialized_value: Option> = redis_conn()?.get(prefix_key(key))?; +pub async fn get Deserialize<'a> + Serialize>(key: &str) -> Result, Error> { + let serialized_value: Option> = redis_conn().await?.get(prefix_key(key)).await?; Ok(match serialized_value { Some(v) => Some(rmp_serde::from_slice::(v.as_ref())?), None => None, @@ -125,22 +132,24 @@ pub fn get Deserialize<'a> + Serialize>(key: &str) -> Result("foo").unwrap(); +/// let cached_value = cache::get::("foo").await.unwrap(); /// assert!(cached_value.is_none()); +/// # }) /// ``` -pub fn delete(key: &str) -> Result<(), Error> { - Ok(redis_conn()?.del(prefix_key(key))?) +pub async fn delete(key: &str) -> Result<(), Error> { + Ok(redis_conn().await?.del(prefix_key(key)).await?) } /// Sets a Redis cache under a `category`. @@ -154,13 +163,13 @@ pub fn delete(key: &str) -> Result<(), Error> { /// * `key` - key (will be prefixed automatically) /// * `value` - (de)serializable value /// * `expire_seconds` - TTL -pub fn set_one Deserialize<'a> + Serialize>( +pub async fn set_one Deserialize<'a> + Serialize>( category: Category, key: &str, value: &V, expire_seconds: u64, ) -> Result<(), Error> { - set(&categorize(category, key), value, expire_seconds) + set(&categorize(category, key), value, expire_seconds).await } /// Gets a Redis cache under a `category`. @@ -171,11 +180,11 @@ pub fn set_one Deserialize<'a> + Serialize>( /// /// * `category` - one of [Category] /// * `key` - key (will be prefixed automatically) -pub fn get_one Deserialize<'a> + Serialize>( +pub async fn get_one Deserialize<'a> + Serialize>( category: Category, key: &str, ) -> Result, Error> { - get(&categorize(category, key)) + get(&categorize(category, key)).await } /// Deletes a Redis cache under a `category`. @@ -186,8 +195,8 @@ pub fn get_one Deserialize<'a> + Serialize>( /// /// * `category` - one of [Category] /// * `key` - key (will be prefixed automatically) -pub fn delete_one(category: Category, key: &str) -> Result<(), Error> { - delete(&categorize(category, key)) +pub async fn delete_one(category: Category, key: &str) -> Result<(), Error> { + delete(&categorize(category, key)).await } /// Deletes all Redis caches under a `category`. @@ -195,28 +204,27 @@ pub fn delete_one(category: Category, key: &str) -> Result<(), Error> { /// ## Arguments /// /// * `category` - one of [Category] -pub fn delete_all(category: Category) -> Result<(), Error> { - let mut redis = redis_conn()?; - let keys: Vec> = redis.keys(wildcard(category))?; +pub async fn delete_all(category: Category) -> Result<(), Error> { + let mut redis = redis_conn().await?; + let keys: Vec> = redis.keys(wildcard(category)).await?; if !keys.is_empty() { - redis.del(keys)? + redis.del(keys).await? } Ok(()) } -// TODO: set_all(), get_all() +// TODO: get_all() #[cfg(test)] mod unit_test { - use crate::database::cache::delete_one; - use super::{delete_all, get, get_one, set, set_one, Category::Test}; + use crate::database::cache::delete_one; use pretty_assertions::assert_eq; - #[test] - fn set_get_expire() { + #[tokio::test] + async fn set_get_expire() { #[derive(serde::Deserialize, serde::Serialize, PartialEq, Debug)] struct Data { id: u32, @@ -235,13 +243,13 @@ mod unit_test { kind: "prime number".to_string(), }; - set(key_1, &value_1, 1).unwrap(); - set(key_2, &value_2, 1).unwrap(); - set(key_3, &value_3, 1).unwrap(); + set(key_1, &value_1, 1).await.unwrap(); + set(key_2, &value_2, 1).await.unwrap(); + set(key_3, &value_3, 1).await.unwrap(); - let cached_value_1: Vec = get(key_1).unwrap().unwrap(); - let cached_value_2: String = get(key_2).unwrap().unwrap(); - let cached_value_3: Data = get(key_3).unwrap().unwrap(); + let cached_value_1: Vec = get(key_1).await.unwrap().unwrap(); + let cached_value_2: String = get(key_2).await.unwrap().unwrap(); + let cached_value_3: Data = get(key_3).await.unwrap().unwrap(); assert_eq!(value_1, cached_value_1); assert_eq!(value_2, cached_value_2); @@ -250,17 +258,17 @@ mod unit_test { // wait for the cache to expire std::thread::sleep(std::time::Duration::from_millis(1100)); - let expired_value_1: Option> = get(key_1).unwrap(); - let expired_value_2: Option> = get(key_2).unwrap(); - let expired_value_3: Option> = get(key_3).unwrap(); + let expired_value_1: Option> = get(key_1).await.unwrap(); + let expired_value_2: Option> = get(key_2).await.unwrap(); + let expired_value_3: Option> = get(key_3).await.unwrap(); assert!(expired_value_1.is_none()); assert!(expired_value_2.is_none()); assert!(expired_value_3.is_none()); } - #[test] - fn use_category() { + #[tokio::test] + async fn use_category() { let key_1 = "fire"; let key_2 = "fish"; let key_3 = "awawa"; @@ -269,24 +277,30 @@ mod unit_test { let value_2 = 998244353u32; let value_3 = 'あ'; - set_one(Test, key_1, &value_1, 5 * 60).unwrap(); - set_one(Test, key_2, &value_2, 5 * 60).unwrap(); - set_one(Test, key_3, &value_3, 5 * 60).unwrap(); + set_one(Test, key_1, &value_1, 5 * 60).await.unwrap(); + set_one(Test, key_2, &value_2, 5 * 60).await.unwrap(); + set_one(Test, key_3, &value_3, 5 * 60).await.unwrap(); - assert_eq!(get_one::(Test, key_1).unwrap().unwrap(), value_1); - assert_eq!(get_one::(Test, key_2).unwrap().unwrap(), value_2); - assert_eq!(get_one::(Test, key_3).unwrap().unwrap(), value_3); + assert_eq!( + get_one::(Test, key_1).await.unwrap().unwrap(), + value_1 + ); + assert_eq!(get_one::(Test, key_2).await.unwrap().unwrap(), value_2); + assert_eq!( + get_one::(Test, key_3).await.unwrap().unwrap(), + value_3 + ); - delete_one(Test, key_1).unwrap(); + delete_one(Test, key_1).await.unwrap(); - assert!(get_one::(Test, key_1).unwrap().is_none()); - assert!(get_one::(Test, key_2).unwrap().is_some()); - assert!(get_one::(Test, key_3).unwrap().is_some()); + assert!(get_one::(Test, key_1).await.unwrap().is_none()); + assert!(get_one::(Test, key_2).await.unwrap().is_some()); + assert!(get_one::(Test, key_3).await.unwrap().is_some()); - delete_all(Test).unwrap(); + delete_all(Test).await.unwrap(); - assert!(get_one::(Test, key_1).unwrap().is_none()); - assert!(get_one::(Test, key_2).unwrap().is_none()); - assert!(get_one::(Test, key_3).unwrap().is_none()); + assert!(get_one::(Test, key_1).await.unwrap().is_none()); + assert!(get_one::(Test, key_2).await.unwrap().is_none()); + assert!(get_one::(Test, key_3).await.unwrap().is_none()); } } diff --git a/packages/backend-rs/src/database/redis.rs b/packages/backend-rs/src/database/redis.rs index b049a75545..03ffdd641a 100644 --- a/packages/backend-rs/src/database/redis.rs +++ b/packages/backend-rs/src/database/redis.rs @@ -1,6 +1,6 @@ use crate::config::CONFIG; use once_cell::sync::OnceCell; -use redis::{Client, Connection, RedisError}; +use redis::{aio::MultiplexedConnection, Client, RedisError}; static REDIS_CLIENT: OnceCell = OnceCell::new(); @@ -32,10 +32,10 @@ fn init_redis() -> Result { Client::open(redis_url) } -pub fn redis_conn() -> Result { +pub async fn redis_conn() -> Result { match REDIS_CLIENT.get() { - Some(client) => Ok(client.get_connection()?), - None => init_redis()?.get_connection(), + Some(client) => Ok(client.get_multiplexed_async_connection().await?), + None => init_redis()?.get_multiplexed_async_connection().await, } } @@ -49,23 +49,26 @@ pub fn key(key: impl ToString) -> String { mod unit_test { use super::redis_conn; use pretty_assertions::assert_eq; - use redis::Commands; + use redis::AsyncCommands; - #[test] - fn connect() { - assert!(redis_conn().is_ok()); - assert!(redis_conn().is_ok()); + #[tokio::test] + async fn connect() { + assert!(redis_conn().await.is_ok()); + assert!(redis_conn().await.is_ok()); } - #[test] - fn access() { - let mut redis = redis_conn().unwrap(); + #[tokio::test] + async fn access() { + let mut redis = redis_conn().await.unwrap(); let key = "CARGO_UNIT_TEST_KEY"; let value = "CARGO_UNIT_TEST_VALUE"; - assert_eq!(redis.set::<&str, &str, String>(key, value).unwrap(), "OK"); - assert_eq!(redis.get::<&str, String>(key).unwrap(), value); - assert_eq!(redis.del::<&str, u32>(key).unwrap(), 1); + assert_eq!( + redis.set::<&str, &str, String>(key, value).await.unwrap(), + "OK" + ); + assert_eq!(redis.get::<&str, String>(key).await.unwrap(), value); + assert_eq!(redis.del::<&str, u32>(key).await.unwrap(), 1); } } diff --git a/packages/backend-rs/src/misc/check_hit_antenna.rs b/packages/backend-rs/src/misc/check_hit_antenna.rs index 957598c852..2b70f0ae8c 100644 --- a/packages/backend-rs/src/misc/check_hit_antenna.rs +++ b/packages/backend-rs/src/misc/check_hit_antenna.rs @@ -104,7 +104,8 @@ pub async fn check_hit_antenna( let db = db_conn().await?; - let blocked_user_ids: Vec = cache::get_one(cache::Category::Block, ¬e.user_id)? + let blocked_user_ids: Vec = cache::get_one(cache::Category::Block, ¬e.user_id) + .await? .unwrap_or({ // cache miss let blocks = blocking::Entity::find() @@ -114,7 +115,7 @@ pub async fn check_hit_antenna( .into_tuple::() .all(db) .await?; - cache::set_one(cache::Category::Block, ¬e.user_id, &blocks, 10 * 60)?; + cache::set_one(cache::Category::Block, ¬e.user_id, &blocks, 10 * 60).await?; blocks }); @@ -125,23 +126,26 @@ pub async fn check_hit_antenna( if [NoteVisibilityEnum::Home, NoteVisibilityEnum::Followers].contains(¬e.visibility) { let following_user_ids: Vec = - cache::get_one(cache::Category::Follow, &antenna.user_id)?.unwrap_or({ - // cache miss - let following = following::Entity::find() - .select_only() - .column(following::Column::FolloweeId) - .filter(following::Column::FollowerId.eq(&antenna.user_id)) - .into_tuple::() - .all(db) + cache::get_one(cache::Category::Follow, &antenna.user_id) + .await? + .unwrap_or({ + // cache miss + let following = following::Entity::find() + .select_only() + .column(following::Column::FolloweeId) + .filter(following::Column::FollowerId.eq(&antenna.user_id)) + .into_tuple::() + .all(db) + .await?; + cache::set_one( + cache::Category::Follow, + &antenna.user_id, + &following, + 10 * 60, + ) .await?; - cache::set_one( - cache::Category::Follow, - &antenna.user_id, - &following, - 10 * 60, - )?; - following - }); + following + }); // if the antenna owner is not following the note author, return false if !following_user_ids.contains(¬e.user_id) { diff --git a/packages/backend-rs/src/misc/get_image_size.rs b/packages/backend-rs/src/misc/get_image_size.rs index 60cf84aec1..750d87b6df 100644 --- a/packages/backend-rs/src/misc/get_image_size.rs +++ b/packages/backend-rs/src/misc/get_image_size.rs @@ -55,10 +55,12 @@ pub async fn get_image_size_from_url(url: &str) -> Result { { let _ = MTX_GUARD.lock().await; - attempted = cache::get_one::(cache::Category::FetchUrl, url)?.is_some(); + attempted = cache::get_one::(cache::Category::FetchUrl, url) + .await? + .is_some(); if !attempted { - cache::set_one(cache::Category::FetchUrl, url, &true, 10 * 60)?; + cache::set_one(cache::Category::FetchUrl, url, &true, 10 * 60).await?; } } @@ -138,7 +140,7 @@ mod unit_test { let mp3_url = "https://firefish.dev/firefish/firefish/-/blob/5891a90f71a8b9d5ea99c683ade7e485c685d642/packages/backend/assets/sounds/aisha/1.mp3"; // delete caches in case you run this test multiple times - cache::delete_all(cache::Category::FetchUrl).unwrap(); + cache::delete_all(cache::Category::FetchUrl).await.unwrap(); let png_size_1 = ImageSize { width: 1024, @@ -207,7 +209,9 @@ mod unit_test { let url = "https://firefish.dev/firefish/firefish/-/raw/5891a90f71a8b9d5ea99c683ade7e485c685d642/packages/backend/assets/splash.png"; // delete caches in case you run this test multiple times - cache::delete_one(cache::Category::FetchUrl, url).unwrap(); + cache::delete_one(cache::Category::FetchUrl, url) + .await + .unwrap(); assert!(get_image_size_from_url(url).await.is_ok()); assert!(get_image_size_from_url(url).await.is_err()); diff --git a/packages/backend-rs/src/misc/latest_version.rs b/packages/backend-rs/src/misc/latest_version.rs index cea6901bbe..69a5a961ab 100644 --- a/packages/backend-rs/src/misc/latest_version.rs +++ b/packages/backend-rs/src/misc/latest_version.rs @@ -46,7 +46,7 @@ async fn get_latest_version() -> Result { #[crate::export] pub async fn latest_version() -> Result { let version: Option = - cache::get_one(cache::Category::FetchUrl, UPSTREAM_PACKAGE_JSON_URL)?; + cache::get_one(cache::Category::FetchUrl, UPSTREAM_PACKAGE_JSON_URL).await?; if let Some(v) = version { tracing::trace!("use cached value: {}", v); @@ -61,7 +61,8 @@ pub async fn latest_version() -> Result { UPSTREAM_PACKAGE_JSON_URL, &fetched_version, 3 * 60 * 60, - )?; + ) + .await?; Ok(fetched_version) } } @@ -97,7 +98,9 @@ mod unit_test { #[tokio::test] async fn check_version() { // delete caches in case you run this test multiple times - cache::delete_one(cache::Category::FetchUrl, UPSTREAM_PACKAGE_JSON_URL).unwrap(); + cache::delete_one(cache::Category::FetchUrl, UPSTREAM_PACKAGE_JSON_URL) + .await + .unwrap(); // fetch from firefish.dev validate_version(latest_version().await.unwrap()); diff --git a/packages/backend-rs/src/service/antenna.rs b/packages/backend-rs/src/service/antenna.rs index b123e4b5d0..dc3d12ebb0 100644 --- a/packages/backend-rs/src/service/antenna.rs +++ b/packages/backend-rs/src/service/antenna.rs @@ -5,7 +5,7 @@ use crate::misc::check_hit_antenna::{check_hit_antenna, AntennaCheckError}; use crate::model::entity::{antenna, note}; use crate::service::stream; use crate::util::id::{get_timestamp, InvalidIdErr}; -use redis::{streams::StreamMaxlen, Commands, RedisError}; +use redis::{streams::StreamMaxlen, AsyncCommands, RedisError}; use sea_orm::{DbErr, EntityTrait}; #[derive(thiserror::Error, Debug)] @@ -33,9 +33,9 @@ type Note = note::Model; async fn antennas() -> Result, Error> { const CACHE_KEY: &str = "antennas"; - Ok(cache::get::>(CACHE_KEY)?.unwrap_or({ + Ok(cache::get::>(CACHE_KEY).await?.unwrap_or({ let antennas = antenna::Entity::find().all(db_conn().await?).await?; - cache::set(CACHE_KEY, &antennas, 5 * 60)?; + cache::set(CACHE_KEY, &antennas, 5 * 60).await?; antennas })) } @@ -52,22 +52,25 @@ pub async fn update_antennas_on_new_note( continue; } if check_hit_antenna(antenna, note.clone(), note_author).await? { - add_note_to_antenna(&antenna.id, ¬e)?; + add_note_to_antenna(&antenna.id, ¬e).await?; } } Ok(()) } -pub fn add_note_to_antenna(antenna_id: &str, note: &Note) -> Result<(), Error> { +pub async fn add_note_to_antenna(antenna_id: &str, note: &Note) -> Result<(), Error> { // for timeline API - redis_conn()?.xadd_maxlen( - redis_key(format!("antennaTimeline:{}", antenna_id)), - StreamMaxlen::Approx(200), - format!("{}-*", get_timestamp(¬e.id)?), - &[("note", ¬e.id)], - )?; + redis_conn() + .await? + .xadd_maxlen( + redis_key(format!("antennaTimeline:{}", antenna_id)), + StreamMaxlen::Approx(200), + format!("{}-*", get_timestamp(¬e.id)?), + &[("note", ¬e.id)], + ) + .await?; // for streaming API - Ok(stream::antenna::publish(antenna_id.to_string(), note)?) + Ok(stream::antenna::publish(antenna_id.to_string(), note).await?) } diff --git a/packages/backend-rs/src/service/nodeinfo/generate.rs b/packages/backend-rs/src/service/nodeinfo/generate.rs index 3385c5a6f8..c88614ee63 100644 --- a/packages/backend-rs/src/service/nodeinfo/generate.rs +++ b/packages/backend-rs/src/service/nodeinfo/generate.rs @@ -116,13 +116,13 @@ async fn generate_nodeinfo_2_1() -> Result { pub async fn nodeinfo_2_1() -> Result { const NODEINFO_2_1_CACHE_KEY: &str = "nodeinfo_2_1"; - let cached = cache::get::(NODEINFO_2_1_CACHE_KEY)?; + let cached = cache::get::(NODEINFO_2_1_CACHE_KEY).await?; if let Some(nodeinfo) = cached { Ok(nodeinfo) } else { let nodeinfo = generate_nodeinfo_2_1().await?; - cache::set(NODEINFO_2_1_CACHE_KEY, &nodeinfo, 60 * 60)?; + cache::set(NODEINFO_2_1_CACHE_KEY, &nodeinfo, 60 * 60).await?; Ok(nodeinfo) } } diff --git a/packages/backend-rs/src/service/stream.rs b/packages/backend-rs/src/service/stream.rs index 279d343f10..3776260d7d 100644 --- a/packages/backend-rs/src/service/stream.rs +++ b/packages/backend-rs/src/service/stream.rs @@ -8,7 +8,7 @@ pub mod moderation; use crate::config::CONFIG; use crate::database::redis_conn; -use redis::{Commands, RedisError}; +use redis::{AsyncCommands, RedisError}; #[derive(strum::Display)] pub enum Stream { @@ -55,7 +55,7 @@ pub enum Error { ValueError(String), } -pub fn publish_to_stream( +pub async fn publish_to_stream( stream: &Stream, kind: Option, value: Option, @@ -70,10 +70,13 @@ pub fn publish_to_stream( value.ok_or(Error::ValueError("Invalid streaming message".to_string()))? }; - redis_conn()?.publish( - &CONFIG.host, - format!("{{\"channel\":\"{}\",\"message\":{}}}", stream, message), - )?; + redis_conn() + .await? + .publish( + &CONFIG.host, + format!("{{\"channel\":\"{}\",\"message\":{}}}", stream, message), + ) + .await?; Ok(()) } diff --git a/packages/backend-rs/src/service/stream/antenna.rs b/packages/backend-rs/src/service/stream/antenna.rs index 3a829df546..3058d9f04c 100644 --- a/packages/backend-rs/src/service/stream/antenna.rs +++ b/packages/backend-rs/src/service/stream/antenna.rs @@ -1,10 +1,11 @@ use crate::model::entity::note; use crate::service::stream::{publish_to_stream, Error, Stream}; -pub fn publish(antenna_id: String, note: ¬e::Model) -> Result<(), Error> { +pub async fn publish(antenna_id: String, note: ¬e::Model) -> Result<(), Error> { publish_to_stream( &Stream::Antenna { antenna_id }, Some("note".to_string()), Some(serde_json::to_string(note)?), ) + .await } diff --git a/packages/backend-rs/src/service/stream/channel.rs b/packages/backend-rs/src/service/stream/channel.rs index 10a04c5e66..9f5cf3802a 100644 --- a/packages/backend-rs/src/service/stream/channel.rs +++ b/packages/backend-rs/src/service/stream/channel.rs @@ -1,10 +1,11 @@ use crate::service::stream::{publish_to_stream, Error, Stream}; #[crate::export(js_name = "publishToChannelStream")] -pub fn publish(channel_id: String, user_id: String) -> Result<(), Error> { +pub async fn publish(channel_id: String, user_id: String) -> Result<(), Error> { publish_to_stream( &Stream::Channel { channel_id }, Some("typing".to_string()), Some(format!("\"{}\"", user_id)), ) + .await } diff --git a/packages/backend-rs/src/service/stream/chat.rs b/packages/backend-rs/src/service/stream/chat.rs index 3015d921e1..84280c319c 100644 --- a/packages/backend-rs/src/service/stream/chat.rs +++ b/packages/backend-rs/src/service/stream/chat.rs @@ -17,7 +17,7 @@ pub enum ChatEvent { // https://github.com/napi-rs/napi-rs/issues/2036 #[crate::export(js_name = "publishToChatStream")] -pub fn publish( +pub async fn publish( sender_user_id: String, receiver_user_id: String, kind: ChatEvent, @@ -31,4 +31,5 @@ pub fn publish( Some(kind.to_string()), Some(serde_json::to_string(object)?), ) + .await } diff --git a/packages/backend-rs/src/service/stream/chat_index.rs b/packages/backend-rs/src/service/stream/chat_index.rs index eb64384dca..6619c5589c 100644 --- a/packages/backend-rs/src/service/stream/chat_index.rs +++ b/packages/backend-rs/src/service/stream/chat_index.rs @@ -13,7 +13,7 @@ pub enum ChatIndexEvent { // https://github.com/napi-rs/napi-rs/issues/2036 #[crate::export(js_name = "publishToChatIndexStream")] -pub fn publish( +pub async fn publish( user_id: String, kind: ChatIndexEvent, object: &serde_json::Value, @@ -23,4 +23,5 @@ pub fn publish( Some(kind.to_string()), Some(serde_json::to_string(object)?), ) + .await } diff --git a/packages/backend-rs/src/service/stream/custom_emoji.rs b/packages/backend-rs/src/service/stream/custom_emoji.rs index 21158fc761..2cd67f5169 100644 --- a/packages/backend-rs/src/service/stream/custom_emoji.rs +++ b/packages/backend-rs/src/service/stream/custom_emoji.rs @@ -18,10 +18,11 @@ pub struct PackedEmoji { } #[crate::export(js_name = "publishToBroadcastStream")] -pub fn publish(emoji: &PackedEmoji) -> Result<(), Error> { +pub async fn publish(emoji: &PackedEmoji) -> Result<(), Error> { publish_to_stream( &Stream::CustomEmoji, Some("emojiAdded".to_string()), Some(format!("{{\"emoji\":{}}}", serde_json::to_string(emoji)?)), ) + .await } diff --git a/packages/backend-rs/src/service/stream/group_chat.rs b/packages/backend-rs/src/service/stream/group_chat.rs index 1e676bbef5..20c04c6fa2 100644 --- a/packages/backend-rs/src/service/stream/group_chat.rs +++ b/packages/backend-rs/src/service/stream/group_chat.rs @@ -4,10 +4,15 @@ use crate::service::stream::{chat::ChatEvent, publish_to_stream, Error, Stream}; // https://github.com/napi-rs/napi-rs/issues/2036 #[crate::export(js_name = "publishToGroupChatStream")] -pub fn publish(group_id: String, kind: ChatEvent, object: &serde_json::Value) -> Result<(), Error> { +pub async fn publish( + group_id: String, + kind: ChatEvent, + object: &serde_json::Value, +) -> Result<(), Error> { publish_to_stream( &Stream::GroupChat { group_id }, Some(kind.to_string()), Some(serde_json::to_string(object)?), ) + .await } diff --git a/packages/backend-rs/src/service/stream/moderation.rs b/packages/backend-rs/src/service/stream/moderation.rs index 576bf9fd21..ef604ed6bf 100644 --- a/packages/backend-rs/src/service/stream/moderation.rs +++ b/packages/backend-rs/src/service/stream/moderation.rs @@ -12,10 +12,11 @@ pub struct AbuseUserReportLike { } #[crate::export(js_name = "publishToModerationStream")] -pub fn publish(moderator_id: String, report: &AbuseUserReportLike) -> Result<(), Error> { +pub async fn publish(moderator_id: String, report: &AbuseUserReportLike) -> Result<(), Error> { publish_to_stream( &Stream::Moderation { moderator_id }, Some("newAbuseUserReport".to_string()), Some(serde_json::to_string(report)?), ) + .await } diff --git a/packages/backend/src/server/api/common/read-messaging-message.ts b/packages/backend/src/server/api/common/read-messaging-message.ts index 2a1667f167..e8706fe86a 100644 --- a/packages/backend/src/server/api/common/read-messaging-message.ts +++ b/packages/backend/src/server/api/common/read-messaging-message.ts @@ -57,13 +57,15 @@ export async function readUserMessagingMessage( ); // Publish event - publishToChatStream(otherpartyId, userId, ChatEvent.Read, messageIds); - publishToChatIndexStream(userId, ChatIndexEvent.Read, messageIds); + await Promise.all([ + publishToChatStream(otherpartyId, userId, ChatEvent.Read, messageIds), + publishToChatIndexStream(userId, ChatIndexEvent.Read, messageIds), + ]); if (!(await Users.getHasUnreadMessagingMessage(userId))) { // 全ての(いままで未読だった)自分宛てのメッセージを(これで)読みましたよというイベントを発行 publishMainStream(userId, "readAllMessagingMessages"); - sendPushNotification(userId, PushNotificationKind.ReadAllChats, {}); + await sendPushNotification(userId, PushNotificationKind.ReadAllChats, {}); } else { // そのユーザーとのメッセージで未読がなければイベント発行 const hasUnread = await MessagingMessages.exists({ @@ -75,9 +77,13 @@ export async function readUserMessagingMessage( }); if (!hasUnread) { - sendPushNotification(userId, PushNotificationKind.ReadAllChatsInTheRoom, { - userId: otherpartyId, - }); + await sendPushNotification( + userId, + PushNotificationKind.ReadAllChatsInTheRoom, + { + userId: otherpartyId, + }, + ); } } } @@ -127,17 +133,19 @@ export async function readGroupMessagingMessage( reads.push(message.id); } - // Publish event - publishToGroupChatStream(groupId, ChatEvent.Read, { - ids: reads, - userId, - }); - publishToChatIndexStream(userId, ChatIndexEvent.Read, reads); + // Publish events + await Promise.all([ + publishToGroupChatStream(groupId, ChatEvent.Read, { + ids: reads, + userId, + }), + publishToChatIndexStream(userId, ChatIndexEvent.Read, reads), + ]); if (!(await Users.getHasUnreadMessagingMessage(userId))) { // 全ての(いままで未読だった)自分宛てのメッセージを(これで)読みましたよというイベントを発行 publishMainStream(userId, "readAllMessagingMessages"); - sendPushNotification(userId, PushNotificationKind.ReadAllChats, {}); + await sendPushNotification(userId, PushNotificationKind.ReadAllChats, {}); } else { // そのグループにおいて未読がなければイベント発行 const hasUnread = await MessagingMessages.createQueryBuilder("message") @@ -151,9 +159,13 @@ export async function readGroupMessagingMessage( .then((x) => x != null); if (!hasUnread) { - sendPushNotification(userId, PushNotificationKind.ReadAllChatsInTheRoom, { - groupId, - }); + await sendPushNotification( + userId, + PushNotificationKind.ReadAllChatsInTheRoom, + { + groupId, + }, + ); } } } diff --git a/packages/backend/src/server/api/common/read-notification.ts b/packages/backend/src/server/api/common/read-notification.ts index 5237406df1..b3fdf31ced 100644 --- a/packages/backend/src/server/api/common/read-notification.ts +++ b/packages/backend/src/server/api/common/read-notification.ts @@ -26,8 +26,8 @@ export async function readNotification( if (result.affected === 0) return; if (!(await Users.getHasUnreadNotification(userId))) - return postReadAllNotifications(userId); - else return postReadNotifications(userId, notificationIds); + return await postReadAllNotifications(userId); + else return await postReadNotifications(userId, notificationIds); } export async function readNotificationByQuery( diff --git a/packages/backend/src/server/api/endpoints/admin/emoji/add.ts b/packages/backend/src/server/api/endpoints/admin/emoji/add.ts index 59139904bc..7b10ad99b1 100644 --- a/packages/backend/src/server/api/endpoints/admin/emoji/add.ts +++ b/packages/backend/src/server/api/endpoints/admin/emoji/add.ts @@ -77,9 +77,10 @@ export default define(meta, paramDef, async (ps, me) => { height: size?.height || null, }).then((x) => Emojis.findOneByOrFail(x.identifiers[0])); - await db.queryResultCache!.remove(["meta_emojis"]); - - publishToBroadcastStream(await Emojis.pack(emoji)); + await Promise.all([ + db.queryResultCache!.remove(["meta_emojis"]), + publishToBroadcastStream(await Emojis.pack(emoji)), + ]); insertModerationLog(me, "addEmoji", { emojiId: emoji.id, diff --git a/packages/backend/src/server/api/endpoints/admin/emoji/copy.ts b/packages/backend/src/server/api/endpoints/admin/emoji/copy.ts index 9b08076b35..8160bf5bf7 100644 --- a/packages/backend/src/server/api/endpoints/admin/emoji/copy.ts +++ b/packages/backend/src/server/api/endpoints/admin/emoji/copy.ts @@ -104,9 +104,10 @@ export default define(meta, paramDef, async (ps, me) => { height: size?.height ?? null, }).then((x) => Emojis.findOneByOrFail(x.identifiers[0])); - await db.queryResultCache!.remove(["meta_emojis"]); - - publishToBroadcastStream(await Emojis.pack(copied)); + await Promise.all([ + db.queryResultCache!.remove(["meta_emojis"]), + publishToBroadcastStream(await Emojis.pack(copied)), + ]); return { id: copied.id, diff --git a/packages/backend/src/server/api/endpoints/i/known-as.ts b/packages/backend/src/server/api/endpoints/i/known-as.ts index 6d1c340bca..62b1cd96ad 100644 --- a/packages/backend/src/server/api/endpoints/i/known-as.ts +++ b/packages/backend/src/server/api/endpoints/i/known-as.ts @@ -102,7 +102,7 @@ export default define(meta, paramDef, async (ps, user) => { acceptAllFollowRequests(user); } - publishToFollowers(user.id); + await publishToFollowers(user.id); return iObj; }); diff --git a/packages/backend/src/server/api/endpoints/i/update.ts b/packages/backend/src/server/api/endpoints/i/update.ts index ac713903af..98d760ddb5 100644 --- a/packages/backend/src/server/api/endpoints/i/update.ts +++ b/packages/backend/src/server/api/endpoints/i/update.ts @@ -351,11 +351,11 @@ export default define(meta, paramDef, async (ps, _user, token) => { // 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認 if (user.isLocked && ps.isLocked === false) { - acceptAllFollowRequests(user); + await acceptAllFollowRequests(user); } // フォロワーにUpdateを配信 - publishToFollowers(user.id); + await publishToFollowers(user.id); return iObj; }); diff --git a/packages/backend/src/server/api/endpoints/notifications/mark-all-as-read.ts b/packages/backend/src/server/api/endpoints/notifications/mark-all-as-read.ts index ffb8d8c2c4..0b5f65eabd 100644 --- a/packages/backend/src/server/api/endpoints/notifications/mark-all-as-read.ts +++ b/packages/backend/src/server/api/endpoints/notifications/mark-all-as-read.ts @@ -31,5 +31,9 @@ export default define(meta, paramDef, async (_, user) => { // 全ての通知を読みましたよというイベントを発行 publishMainStream(user.id, "readAllNotifications"); - sendPushNotification(user.id, PushNotificationKind.ReadAllNotifications, {}); + await sendPushNotification( + user.id, + PushNotificationKind.ReadAllNotifications, + {}, + ); }); diff --git a/packages/backend/src/server/api/endpoints/users/report-abuse.ts b/packages/backend/src/server/api/endpoints/users/report-abuse.ts index f43cd8ae0f..8a134cf9e7 100644 --- a/packages/backend/src/server/api/endpoints/users/report-abuse.ts +++ b/packages/backend/src/server/api/endpoints/users/report-abuse.ts @@ -84,8 +84,8 @@ export default define(meta, paramDef, async (ps, me) => { ], }); - for (const moderator of moderators) { - publishToModerationStream(moderator.id, { + for await (const moderator of moderators) { + await publishToModerationStream(moderator.id, { id: report.id, targetUserId: report.targetUserId, reporterId: report.reporterId, diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts index 3bb0ca073b..8e77677105 100644 --- a/packages/backend/src/server/api/stream/index.ts +++ b/packages/backend/src/server/api/stream/index.ts @@ -287,10 +287,10 @@ export default class Connection { // クライアントの事情を考慮したとき、入力フォームはノートチャンネルやメッセージのメインコンポーネントとは別 // なこともあるため、それらのコンポーネントがそれぞれ各チャンネルに接続するようにするのは面倒なため。 case "typingOnChannel": - this.typingOnChannel(body.channel); + await this.typingOnChannel(body.channel); break; case "typingOnMessaging": - this.typingOnMessaging(body); + await this.typingOnMessaging(body); break; } } @@ -513,26 +513,30 @@ export default class Connection { } } - private typingOnChannel(channelId: ChannelModel["id"]) { + private async typingOnChannel(channelId: ChannelModel["id"]) { if (this.user) { - publishToChannelStream(channelId, this.user.id); + await publishToChannelStream(channelId, this.user.id); } } - private typingOnMessaging(param: { + private async typingOnMessaging(param: { partner?: User["id"]; group?: UserGroup["id"]; }) { if (this.user) { if (param.partner) { - publishToChatStream( + await publishToChatStream( param.partner, this.user.id, ChatEvent.Typing, this.user.id, ); } else if (param.group != null) { - publishToGroupChatStream(param.group, ChatEvent.Typing, this.user.id); + await publishToGroupChatStream( + param.group, + ChatEvent.Typing, + this.user.id, + ); } } } diff --git a/packages/backend/src/services/create-notification.ts b/packages/backend/src/services/create-notification.ts index e62bd38ec4..335ba561e7 100644 --- a/packages/backend/src/services/create-notification.ts +++ b/packages/backend/src/services/create-notification.ts @@ -85,7 +85,11 @@ export async function createNotification( if (fresh == null) return; // 既に削除されているかもしれない // We execute this before, because the server side "read" check doesnt work well with push notifications, the app and service worker will decide themself // when it is best to show push notifications - sendPushNotification(notifieeId, PushNotificationKind.Generic, packed); + await sendPushNotification( + notifieeId, + PushNotificationKind.Generic, + packed, + ); if (fresh.isRead) return; //#region ただしミュートしているユーザーからの通知なら無視 diff --git a/packages/backend/src/services/messages/create.ts b/packages/backend/src/services/messages/create.ts index 931a1e4c57..709e9bc235 100644 --- a/packages/backend/src/services/messages/create.ts +++ b/packages/backend/src/services/messages/create.ts @@ -55,46 +55,54 @@ export async function createMessage( if (recipientUser) { if (Users.isLocalUser(user)) { - // 自分のストリーム - publishToChatStream( - message.userId, - recipientUser.id, - ChatEvent.Message, - messageObj, - ); - publishToChatIndexStream( - message.userId, - ChatIndexEvent.Message, - messageObj, - ); + // my stream + await Promise.all([ + publishToChatStream( + message.userId, + recipientUser.id, + ChatEvent.Message, + messageObj, + ), + publishToChatIndexStream( + message.userId, + ChatIndexEvent.Message, + messageObj, + ), + ]); publishMainStream(message.userId, "messagingMessage", messageObj); } if (Users.isLocalUser(recipientUser)) { - // 相手のストリーム - publishToChatStream( - recipientUser.id, - message.userId, - ChatEvent.Message, - messageObj, - ); - publishToChatIndexStream( - recipientUser.id, - ChatIndexEvent.Message, - messageObj, - ); + // recipient's stream + await Promise.all([ + publishToChatStream( + recipientUser.id, + message.userId, + ChatEvent.Message, + messageObj, + ), + publishToChatIndexStream( + recipientUser.id, + ChatIndexEvent.Message, + messageObj, + ), + ]); publishMainStream(recipientUser.id, "messagingMessage", messageObj); } } else if (recipientGroup != null) { // group's stream - publishToGroupChatStream(recipientGroup.id, ChatEvent.Message, messageObj); + await publishToGroupChatStream( + recipientGroup.id, + ChatEvent.Message, + messageObj, + ); // member's stream const joinings = await UserGroupJoinings.findBy({ userGroupId: recipientGroup.id, }); - for (const joining of joinings) { - publishToChatIndexStream( + for await (const joining of joinings) { + await publishToChatIndexStream( joining.userId, ChatIndexEvent.Message, messageObj, @@ -119,7 +127,7 @@ export async function createMessage( //#endregion publishMainStream(recipientUser.id, "unreadMessagingMessage", messageObj); - sendPushNotification( + await sendPushNotification( recipientUser.id, PushNotificationKind.Chat, messageObj, @@ -129,10 +137,10 @@ export async function createMessage( userGroupId: recipientGroup.id, userId: Not(user.id), }); - for (const joining of joinings) { + for await (const joining of joinings) { if (freshMessage.reads.includes(joining.userId)) return; // 既読 publishMainStream(joining.userId, "unreadMessagingMessage", messageObj); - sendPushNotification( + await sendPushNotification( joining.userId, PushNotificationKind.Chat, messageObj, diff --git a/packages/backend/src/services/messages/delete.ts b/packages/backend/src/services/messages/delete.ts index 5fc4c812e2..063e694ad5 100644 --- a/packages/backend/src/services/messages/delete.ts +++ b/packages/backend/src/services/messages/delete.ts @@ -18,18 +18,20 @@ export async function deleteMessage(message: MessagingMessage) { async function postDeleteMessage(message: MessagingMessage) { if (message.recipientId) { - const user = await Users.findOneByOrFail({ id: message.userId }); - const recipient = await Users.findOneByOrFail({ id: message.recipientId }); + const [user, recipient] = await Promise.all([ + Users.findOneByOrFail({ id: message.userId }), + Users.findOneByOrFail({ id: message.recipientId }), + ]); if (Users.isLocalUser(user)) - publishToChatStream( + await publishToChatStream( message.userId, message.recipientId, ChatEvent.Deleted, message.id, ); if (Users.isLocalUser(recipient)) - publishToChatStream( + await publishToChatStream( message.recipientId, message.userId, ChatEvent.Deleted, @@ -46,6 +48,10 @@ async function postDeleteMessage(message: MessagingMessage) { deliver(user, activity, recipient.inbox); } } else if (message.groupId != null) { - publishToGroupChatStream(message.groupId, ChatEvent.Deleted, message.id); + await publishToGroupChatStream( + message.groupId, + ChatEvent.Deleted, + message.id, + ); } }