refactor (backend-rs): use async redis commands

This commit is contained in:
naskya 2024-05-18 07:36:52 +09:00
parent 42d4f2fd79
commit 61562a0943
No known key found for this signature in database
GPG key ID: 712D413B3A9FED5C
31 changed files with 331 additions and 207 deletions

37
Cargo.lock generated
View file

@ -239,6 +239,7 @@ dependencies = [
"sysinfo",
"thiserror",
"tokio",
"tokio-test",
"tracing",
"tracing-subscriber",
"url",
@ -564,7 +565,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd"
dependencies = [
"bytes",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
@ -2618,12 +2623,18 @@ version = "0.25.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6472825949c09872e8f2c50bde59fcefc17748b6be5c90fd67cd8b4daca73bfd"
dependencies = [
"async-trait",
"bytes",
"combine",
"futures-util",
"itoa",
"percent-encoding",
"pin-project-lite",
"ryu",
"sha1_smol",
"socket2",
"tokio",
"tokio-util",
"url",
]
@ -3767,6 +3778,32 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-test"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7"
dependencies = [
"async-stream",
"bytes",
"futures-core",
"tokio",
"tokio-stream",
]
[[package]]
name = "tokio-util"
version = "0.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
]
[[package]]
name = "toml"
version = "0.8.13"

View file

@ -38,6 +38,7 @@ syn = "2.0.64"
sysinfo = "0.30.12"
thiserror = "1.0.61"
tokio = "1.37.0"
tokio-test = "0.4.4"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
url = "2.5.0"

View file

@ -31,7 +31,7 @@ nom-exif = { workspace = true }
once_cell = { workspace = true }
openssl = { workspace = true, features = ["vendored"] }
rand = { workspace = true }
redis = { workspace = true }
redis = { workspace = true, features = ["tokio-comp"] }
regex = { workspace = true }
rmp-serde = { workspace = true }
sea-orm = { workspace = true, features = ["sqlx-postgres", "runtime-tokio-rustls"] }
@ -50,6 +50,7 @@ web-push = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }
tokio-test = { workspace = true }
[build-dependencies]
napi-build = { workspace = true }

View file

@ -1288,19 +1288,19 @@ export enum PushNotificationKind {
ReadAllNotifications = 'readAllNotifications'
}
export function sendPushNotification(receiverUserId: string, kind: PushNotificationKind, content: any): Promise<void>
export function publishToChannelStream(channelId: string, userId: string): void
export function publishToChannelStream(channelId: string, userId: string): Promise<void>
export enum ChatEvent {
Message = 'message',
Read = 'read',
Deleted = 'deleted',
Typing = 'typing'
}
export function publishToChatStream(senderUserId: string, receiverUserId: string, kind: ChatEvent, object: any): void
export function publishToChatStream(senderUserId: string, receiverUserId: string, kind: ChatEvent, object: any): Promise<void>
export enum ChatIndexEvent {
Message = 'message',
Read = 'read'
}
export function publishToChatIndexStream(userId: string, kind: ChatIndexEvent, object: any): void
export function publishToChatIndexStream(userId: string, kind: ChatIndexEvent, object: any): Promise<void>
export interface PackedEmoji {
id: string
aliases: Array<string>
@ -1312,15 +1312,15 @@ export interface PackedEmoji {
width: number | null
height: number | null
}
export function publishToBroadcastStream(emoji: PackedEmoji): void
export function publishToGroupChatStream(groupId: string, kind: ChatEvent, object: any): void
export function publishToBroadcastStream(emoji: PackedEmoji): Promise<void>
export function publishToGroupChatStream(groupId: string, kind: ChatEvent, object: any): Promise<void>
export interface AbuseUserReportLike {
id: string
targetUserId: string
reporterId: string
comment: string
}
export function publishToModerationStream(moderatorId: string, report: AbuseUserReportLike): void
export function publishToModerationStream(moderatorId: string, report: AbuseUserReportLike): Promise<void>
export function getTimestamp(id: string): number
/**
* The generated ID results in the form of `[8 chars timestamp] + [cuid2]`.

View file

@ -1,5 +1,5 @@
use crate::database::{redis_conn, redis_key};
use redis::{Commands, RedisError};
use redis::{AsyncCommands, RedisError};
use serde::{Deserialize, Serialize};
#[derive(strum::Display, Debug)]
@ -54,26 +54,31 @@ fn wildcard(category: Category) -> String {
///
/// ```
/// # use backend_rs::database::cache;
/// # tokio_test::block_on(async {
/// let key = "apple";
/// let data = "I want to cache this string".to_string();
///
/// // caches the data for 10 seconds
/// cache::set(key, &data, 10);
/// cache::set(key, &data, 10).await;
///
/// // get the cache
/// let cached_data = cache::get::<String>(key).unwrap();
/// let cached_data = cache::get::<String>(key).await.unwrap();
/// assert_eq!(data, cached_data.unwrap());
/// # })
/// ```
pub fn set<V: for<'a> Deserialize<'a> + Serialize>(
pub async fn set<V: for<'a> Deserialize<'a> + Serialize>(
key: &str,
value: &V,
expire_seconds: u64,
) -> Result<(), Error> {
redis_conn()?.set_ex(
prefix_key(key),
rmp_serde::encode::to_vec(&value)?,
expire_seconds,
)?;
redis_conn()
.await?
.set_ex(
prefix_key(key),
rmp_serde::encode::to_vec(&value)?,
expire_seconds,
)
.await?;
Ok(())
}
@ -90,22 +95,24 @@ pub fn set<V: for<'a> Deserialize<'a> + Serialize>(
///
/// ```
/// # use backend_rs::database::cache;
/// # tokio_test::block_on(async {
/// let key = "banana";
/// let data = "I want to cache this string".to_string();
///
/// // set cache
/// cache::set(key, &data, 10).unwrap();
/// cache::set(key, &data, 10).await.unwrap();
///
/// // get cache
/// let cached_data = cache::get::<String>(key).unwrap();
/// let cached_data = cache::get::<String>(key).await.unwrap();
/// assert_eq!(data, cached_data.unwrap());
///
/// // get nonexistent (or expired) cache
/// let no_cache = cache::get::<String>("nonexistent").unwrap();
/// let no_cache = cache::get::<String>("nonexistent").await.unwrap();
/// assert!(no_cache.is_none());
/// # })
/// ```
pub fn get<V: for<'a> Deserialize<'a> + Serialize>(key: &str) -> Result<Option<V>, Error> {
let serialized_value: Option<Vec<u8>> = redis_conn()?.get(prefix_key(key))?;
pub async fn get<V: for<'a> Deserialize<'a> + Serialize>(key: &str) -> Result<Option<V>, Error> {
let serialized_value: Option<Vec<u8>> = redis_conn().await?.get(prefix_key(key)).await?;
Ok(match serialized_value {
Some(v) => Some(rmp_serde::from_slice::<V>(v.as_ref())?),
None => None,
@ -125,22 +132,24 @@ pub fn get<V: for<'a> Deserialize<'a> + Serialize>(key: &str) -> Result<Option<V
///
/// ```
/// # use backend_rs::database::cache;
/// # tokio_test::block_on(async {
/// let key = "chocolate";
/// let value = "I want to cache this string".to_string();
///
/// // set cache
/// cache::set(key, &value, 10).unwrap();
/// cache::set(key, &value, 10).await.unwrap();
///
/// // delete the cache
/// cache::delete("foo").unwrap();
/// cache::delete("nonexistent").unwrap(); // this is okay
/// cache::delete("foo").await.unwrap();
/// cache::delete("nonexistent").await.unwrap(); // this is okay
///
/// // the cache is gone
/// let cached_value = cache::get::<String>("foo").unwrap();
/// let cached_value = cache::get::<String>("foo").await.unwrap();
/// assert!(cached_value.is_none());
/// # })
/// ```
pub fn delete(key: &str) -> Result<(), Error> {
Ok(redis_conn()?.del(prefix_key(key))?)
pub async fn delete(key: &str) -> Result<(), Error> {
Ok(redis_conn().await?.del(prefix_key(key)).await?)
}
/// Sets a Redis cache under a `category`.
@ -154,13 +163,13 @@ pub fn delete(key: &str) -> Result<(), Error> {
/// * `key` - key (will be prefixed automatically)
/// * `value` - (de)serializable value
/// * `expire_seconds` - TTL
pub fn set_one<V: for<'a> Deserialize<'a> + Serialize>(
pub async fn set_one<V: for<'a> Deserialize<'a> + Serialize>(
category: Category,
key: &str,
value: &V,
expire_seconds: u64,
) -> Result<(), Error> {
set(&categorize(category, key), value, expire_seconds)
set(&categorize(category, key), value, expire_seconds).await
}
/// Gets a Redis cache under a `category`.
@ -171,11 +180,11 @@ pub fn set_one<V: for<'a> Deserialize<'a> + Serialize>(
///
/// * `category` - one of [Category]
/// * `key` - key (will be prefixed automatically)
pub fn get_one<V: for<'a> Deserialize<'a> + Serialize>(
pub async fn get_one<V: for<'a> Deserialize<'a> + Serialize>(
category: Category,
key: &str,
) -> Result<Option<V>, Error> {
get(&categorize(category, key))
get(&categorize(category, key)).await
}
/// Deletes a Redis cache under a `category`.
@ -186,8 +195,8 @@ pub fn get_one<V: for<'a> Deserialize<'a> + Serialize>(
///
/// * `category` - one of [Category]
/// * `key` - key (will be prefixed automatically)
pub fn delete_one(category: Category, key: &str) -> Result<(), Error> {
delete(&categorize(category, key))
pub async fn delete_one(category: Category, key: &str) -> Result<(), Error> {
delete(&categorize(category, key)).await
}
/// Deletes all Redis caches under a `category`.
@ -195,28 +204,27 @@ pub fn delete_one(category: Category, key: &str) -> Result<(), Error> {
/// ## Arguments
///
/// * `category` - one of [Category]
pub fn delete_all(category: Category) -> Result<(), Error> {
let mut redis = redis_conn()?;
let keys: Vec<Vec<u8>> = redis.keys(wildcard(category))?;
pub async fn delete_all(category: Category) -> Result<(), Error> {
let mut redis = redis_conn().await?;
let keys: Vec<Vec<u8>> = redis.keys(wildcard(category)).await?;
if !keys.is_empty() {
redis.del(keys)?
redis.del(keys).await?
}
Ok(())
}
// TODO: set_all(), get_all()
// TODO: get_all()
#[cfg(test)]
mod unit_test {
use crate::database::cache::delete_one;
use super::{delete_all, get, get_one, set, set_one, Category::Test};
use crate::database::cache::delete_one;
use pretty_assertions::assert_eq;
#[test]
fn set_get_expire() {
#[tokio::test]
async fn set_get_expire() {
#[derive(serde::Deserialize, serde::Serialize, PartialEq, Debug)]
struct Data {
id: u32,
@ -235,13 +243,13 @@ mod unit_test {
kind: "prime number".to_string(),
};
set(key_1, &value_1, 1).unwrap();
set(key_2, &value_2, 1).unwrap();
set(key_3, &value_3, 1).unwrap();
set(key_1, &value_1, 1).await.unwrap();
set(key_2, &value_2, 1).await.unwrap();
set(key_3, &value_3, 1).await.unwrap();
let cached_value_1: Vec<i32> = get(key_1).unwrap().unwrap();
let cached_value_2: String = get(key_2).unwrap().unwrap();
let cached_value_3: Data = get(key_3).unwrap().unwrap();
let cached_value_1: Vec<i32> = get(key_1).await.unwrap().unwrap();
let cached_value_2: String = get(key_2).await.unwrap().unwrap();
let cached_value_3: Data = get(key_3).await.unwrap().unwrap();
assert_eq!(value_1, cached_value_1);
assert_eq!(value_2, cached_value_2);
@ -250,17 +258,17 @@ mod unit_test {
// wait for the cache to expire
std::thread::sleep(std::time::Duration::from_millis(1100));
let expired_value_1: Option<Vec<i32>> = get(key_1).unwrap();
let expired_value_2: Option<Vec<i32>> = get(key_2).unwrap();
let expired_value_3: Option<Vec<i32>> = get(key_3).unwrap();
let expired_value_1: Option<Vec<i32>> = get(key_1).await.unwrap();
let expired_value_2: Option<Vec<i32>> = get(key_2).await.unwrap();
let expired_value_3: Option<Vec<i32>> = get(key_3).await.unwrap();
assert!(expired_value_1.is_none());
assert!(expired_value_2.is_none());
assert!(expired_value_3.is_none());
}
#[test]
fn use_category() {
#[tokio::test]
async fn use_category() {
let key_1 = "fire";
let key_2 = "fish";
let key_3 = "awawa";
@ -269,24 +277,30 @@ mod unit_test {
let value_2 = 998244353u32;
let value_3 = 'あ';
set_one(Test, key_1, &value_1, 5 * 60).unwrap();
set_one(Test, key_2, &value_2, 5 * 60).unwrap();
set_one(Test, key_3, &value_3, 5 * 60).unwrap();
set_one(Test, key_1, &value_1, 5 * 60).await.unwrap();
set_one(Test, key_2, &value_2, 5 * 60).await.unwrap();
set_one(Test, key_3, &value_3, 5 * 60).await.unwrap();
assert_eq!(get_one::<String>(Test, key_1).unwrap().unwrap(), value_1);
assert_eq!(get_one::<u32>(Test, key_2).unwrap().unwrap(), value_2);
assert_eq!(get_one::<char>(Test, key_3).unwrap().unwrap(), value_3);
assert_eq!(
get_one::<String>(Test, key_1).await.unwrap().unwrap(),
value_1
);
assert_eq!(get_one::<u32>(Test, key_2).await.unwrap().unwrap(), value_2);
assert_eq!(
get_one::<char>(Test, key_3).await.unwrap().unwrap(),
value_3
);
delete_one(Test, key_1).unwrap();
delete_one(Test, key_1).await.unwrap();
assert!(get_one::<String>(Test, key_1).unwrap().is_none());
assert!(get_one::<u32>(Test, key_2).unwrap().is_some());
assert!(get_one::<char>(Test, key_3).unwrap().is_some());
assert!(get_one::<String>(Test, key_1).await.unwrap().is_none());
assert!(get_one::<u32>(Test, key_2).await.unwrap().is_some());
assert!(get_one::<char>(Test, key_3).await.unwrap().is_some());
delete_all(Test).unwrap();
delete_all(Test).await.unwrap();
assert!(get_one::<String>(Test, key_1).unwrap().is_none());
assert!(get_one::<u32>(Test, key_2).unwrap().is_none());
assert!(get_one::<char>(Test, key_3).unwrap().is_none());
assert!(get_one::<String>(Test, key_1).await.unwrap().is_none());
assert!(get_one::<u32>(Test, key_2).await.unwrap().is_none());
assert!(get_one::<char>(Test, key_3).await.unwrap().is_none());
}
}

View file

@ -1,6 +1,6 @@
use crate::config::CONFIG;
use once_cell::sync::OnceCell;
use redis::{Client, Connection, RedisError};
use redis::{aio::MultiplexedConnection, Client, RedisError};
static REDIS_CLIENT: OnceCell<Client> = OnceCell::new();
@ -32,10 +32,10 @@ fn init_redis() -> Result<Client, RedisError> {
Client::open(redis_url)
}
pub fn redis_conn() -> Result<Connection, RedisError> {
pub async fn redis_conn() -> Result<MultiplexedConnection, RedisError> {
match REDIS_CLIENT.get() {
Some(client) => Ok(client.get_connection()?),
None => init_redis()?.get_connection(),
Some(client) => Ok(client.get_multiplexed_async_connection().await?),
None => init_redis()?.get_multiplexed_async_connection().await,
}
}
@ -49,23 +49,26 @@ pub fn key(key: impl ToString) -> String {
mod unit_test {
use super::redis_conn;
use pretty_assertions::assert_eq;
use redis::Commands;
use redis::AsyncCommands;
#[test]
fn connect() {
assert!(redis_conn().is_ok());
assert!(redis_conn().is_ok());
#[tokio::test]
async fn connect() {
assert!(redis_conn().await.is_ok());
assert!(redis_conn().await.is_ok());
}
#[test]
fn access() {
let mut redis = redis_conn().unwrap();
#[tokio::test]
async fn access() {
let mut redis = redis_conn().await.unwrap();
let key = "CARGO_UNIT_TEST_KEY";
let value = "CARGO_UNIT_TEST_VALUE";
assert_eq!(redis.set::<&str, &str, String>(key, value).unwrap(), "OK");
assert_eq!(redis.get::<&str, String>(key).unwrap(), value);
assert_eq!(redis.del::<&str, u32>(key).unwrap(), 1);
assert_eq!(
redis.set::<&str, &str, String>(key, value).await.unwrap(),
"OK"
);
assert_eq!(redis.get::<&str, String>(key).await.unwrap(), value);
assert_eq!(redis.del::<&str, u32>(key).await.unwrap(), 1);
}
}

View file

@ -104,7 +104,8 @@ pub async fn check_hit_antenna(
let db = db_conn().await?;
let blocked_user_ids: Vec<String> = cache::get_one(cache::Category::Block, &note.user_id)?
let blocked_user_ids: Vec<String> = cache::get_one(cache::Category::Block, &note.user_id)
.await?
.unwrap_or({
// cache miss
let blocks = blocking::Entity::find()
@ -114,7 +115,7 @@ pub async fn check_hit_antenna(
.into_tuple::<String>()
.all(db)
.await?;
cache::set_one(cache::Category::Block, &note.user_id, &blocks, 10 * 60)?;
cache::set_one(cache::Category::Block, &note.user_id, &blocks, 10 * 60).await?;
blocks
});
@ -125,23 +126,26 @@ pub async fn check_hit_antenna(
if [NoteVisibilityEnum::Home, NoteVisibilityEnum::Followers].contains(&note.visibility) {
let following_user_ids: Vec<String> =
cache::get_one(cache::Category::Follow, &antenna.user_id)?.unwrap_or({
// cache miss
let following = following::Entity::find()
.select_only()
.column(following::Column::FolloweeId)
.filter(following::Column::FollowerId.eq(&antenna.user_id))
.into_tuple::<String>()
.all(db)
cache::get_one(cache::Category::Follow, &antenna.user_id)
.await?
.unwrap_or({
// cache miss
let following = following::Entity::find()
.select_only()
.column(following::Column::FolloweeId)
.filter(following::Column::FollowerId.eq(&antenna.user_id))
.into_tuple::<String>()
.all(db)
.await?;
cache::set_one(
cache::Category::Follow,
&antenna.user_id,
&following,
10 * 60,
)
.await?;
cache::set_one(
cache::Category::Follow,
&antenna.user_id,
&following,
10 * 60,
)?;
following
});
following
});
// if the antenna owner is not following the note author, return false
if !following_user_ids.contains(&note.user_id) {

View file

@ -55,10 +55,12 @@ pub async fn get_image_size_from_url(url: &str) -> Result<ImageSize, Error> {
{
let _ = MTX_GUARD.lock().await;
attempted = cache::get_one::<bool>(cache::Category::FetchUrl, url)?.is_some();
attempted = cache::get_one::<bool>(cache::Category::FetchUrl, url)
.await?
.is_some();
if !attempted {
cache::set_one(cache::Category::FetchUrl, url, &true, 10 * 60)?;
cache::set_one(cache::Category::FetchUrl, url, &true, 10 * 60).await?;
}
}
@ -138,7 +140,7 @@ mod unit_test {
let mp3_url = "https://firefish.dev/firefish/firefish/-/blob/5891a90f71a8b9d5ea99c683ade7e485c685d642/packages/backend/assets/sounds/aisha/1.mp3";
// delete caches in case you run this test multiple times
cache::delete_all(cache::Category::FetchUrl).unwrap();
cache::delete_all(cache::Category::FetchUrl).await.unwrap();
let png_size_1 = ImageSize {
width: 1024,
@ -207,7 +209,9 @@ mod unit_test {
let url = "https://firefish.dev/firefish/firefish/-/raw/5891a90f71a8b9d5ea99c683ade7e485c685d642/packages/backend/assets/splash.png";
// delete caches in case you run this test multiple times
cache::delete_one(cache::Category::FetchUrl, url).unwrap();
cache::delete_one(cache::Category::FetchUrl, url)
.await
.unwrap();
assert!(get_image_size_from_url(url).await.is_ok());
assert!(get_image_size_from_url(url).await.is_err());

View file

@ -46,7 +46,7 @@ async fn get_latest_version() -> Result<String, Error> {
#[crate::export]
pub async fn latest_version() -> Result<String, Error> {
let version: Option<String> =
cache::get_one(cache::Category::FetchUrl, UPSTREAM_PACKAGE_JSON_URL)?;
cache::get_one(cache::Category::FetchUrl, UPSTREAM_PACKAGE_JSON_URL).await?;
if let Some(v) = version {
tracing::trace!("use cached value: {}", v);
@ -61,7 +61,8 @@ pub async fn latest_version() -> Result<String, Error> {
UPSTREAM_PACKAGE_JSON_URL,
&fetched_version,
3 * 60 * 60,
)?;
)
.await?;
Ok(fetched_version)
}
}
@ -97,7 +98,9 @@ mod unit_test {
#[tokio::test]
async fn check_version() {
// delete caches in case you run this test multiple times
cache::delete_one(cache::Category::FetchUrl, UPSTREAM_PACKAGE_JSON_URL).unwrap();
cache::delete_one(cache::Category::FetchUrl, UPSTREAM_PACKAGE_JSON_URL)
.await
.unwrap();
// fetch from firefish.dev
validate_version(latest_version().await.unwrap());

View file

@ -5,7 +5,7 @@ use crate::misc::check_hit_antenna::{check_hit_antenna, AntennaCheckError};
use crate::model::entity::{antenna, note};
use crate::service::stream;
use crate::util::id::{get_timestamp, InvalidIdErr};
use redis::{streams::StreamMaxlen, Commands, RedisError};
use redis::{streams::StreamMaxlen, AsyncCommands, RedisError};
use sea_orm::{DbErr, EntityTrait};
#[derive(thiserror::Error, Debug)]
@ -33,9 +33,9 @@ type Note = note::Model;
async fn antennas() -> Result<Vec<Antenna>, Error> {
const CACHE_KEY: &str = "antennas";
Ok(cache::get::<Vec<Antenna>>(CACHE_KEY)?.unwrap_or({
Ok(cache::get::<Vec<Antenna>>(CACHE_KEY).await?.unwrap_or({
let antennas = antenna::Entity::find().all(db_conn().await?).await?;
cache::set(CACHE_KEY, &antennas, 5 * 60)?;
cache::set(CACHE_KEY, &antennas, 5 * 60).await?;
antennas
}))
}
@ -52,22 +52,25 @@ pub async fn update_antennas_on_new_note(
continue;
}
if check_hit_antenna(antenna, note.clone(), note_author).await? {
add_note_to_antenna(&antenna.id, &note)?;
add_note_to_antenna(&antenna.id, &note).await?;
}
}
Ok(())
}
pub fn add_note_to_antenna(antenna_id: &str, note: &Note) -> Result<(), Error> {
pub async fn add_note_to_antenna(antenna_id: &str, note: &Note) -> Result<(), Error> {
// for timeline API
redis_conn()?.xadd_maxlen(
redis_key(format!("antennaTimeline:{}", antenna_id)),
StreamMaxlen::Approx(200),
format!("{}-*", get_timestamp(&note.id)?),
&[("note", &note.id)],
)?;
redis_conn()
.await?
.xadd_maxlen(
redis_key(format!("antennaTimeline:{}", antenna_id)),
StreamMaxlen::Approx(200),
format!("{}-*", get_timestamp(&note.id)?),
&[("note", &note.id)],
)
.await?;
// for streaming API
Ok(stream::antenna::publish(antenna_id.to_string(), note)?)
Ok(stream::antenna::publish(antenna_id.to_string(), note).await?)
}

View file

@ -116,13 +116,13 @@ async fn generate_nodeinfo_2_1() -> Result<Nodeinfo21, Error> {
pub async fn nodeinfo_2_1() -> Result<Nodeinfo21, Error> {
const NODEINFO_2_1_CACHE_KEY: &str = "nodeinfo_2_1";
let cached = cache::get::<Nodeinfo21>(NODEINFO_2_1_CACHE_KEY)?;
let cached = cache::get::<Nodeinfo21>(NODEINFO_2_1_CACHE_KEY).await?;
if let Some(nodeinfo) = cached {
Ok(nodeinfo)
} else {
let nodeinfo = generate_nodeinfo_2_1().await?;
cache::set(NODEINFO_2_1_CACHE_KEY, &nodeinfo, 60 * 60)?;
cache::set(NODEINFO_2_1_CACHE_KEY, &nodeinfo, 60 * 60).await?;
Ok(nodeinfo)
}
}

View file

@ -8,7 +8,7 @@ pub mod moderation;
use crate::config::CONFIG;
use crate::database::redis_conn;
use redis::{Commands, RedisError};
use redis::{AsyncCommands, RedisError};
#[derive(strum::Display)]
pub enum Stream {
@ -55,7 +55,7 @@ pub enum Error {
ValueError(String),
}
pub fn publish_to_stream(
pub async fn publish_to_stream(
stream: &Stream,
kind: Option<String>,
value: Option<String>,
@ -70,10 +70,13 @@ pub fn publish_to_stream(
value.ok_or(Error::ValueError("Invalid streaming message".to_string()))?
};
redis_conn()?.publish(
&CONFIG.host,
format!("{{\"channel\":\"{}\",\"message\":{}}}", stream, message),
)?;
redis_conn()
.await?
.publish(
&CONFIG.host,
format!("{{\"channel\":\"{}\",\"message\":{}}}", stream, message),
)
.await?;
Ok(())
}

View file

@ -1,10 +1,11 @@
use crate::model::entity::note;
use crate::service::stream::{publish_to_stream, Error, Stream};
pub fn publish(antenna_id: String, note: &note::Model) -> Result<(), Error> {
pub async fn publish(antenna_id: String, note: &note::Model) -> Result<(), Error> {
publish_to_stream(
&Stream::Antenna { antenna_id },
Some("note".to_string()),
Some(serde_json::to_string(note)?),
)
.await
}

View file

@ -1,10 +1,11 @@
use crate::service::stream::{publish_to_stream, Error, Stream};
#[crate::export(js_name = "publishToChannelStream")]
pub fn publish(channel_id: String, user_id: String) -> Result<(), Error> {
pub async fn publish(channel_id: String, user_id: String) -> Result<(), Error> {
publish_to_stream(
&Stream::Channel { channel_id },
Some("typing".to_string()),
Some(format!("\"{}\"", user_id)),
)
.await
}

View file

@ -17,7 +17,7 @@ pub enum ChatEvent {
// https://github.com/napi-rs/napi-rs/issues/2036
#[crate::export(js_name = "publishToChatStream")]
pub fn publish(
pub async fn publish(
sender_user_id: String,
receiver_user_id: String,
kind: ChatEvent,
@ -31,4 +31,5 @@ pub fn publish(
Some(kind.to_string()),
Some(serde_json::to_string(object)?),
)
.await
}

View file

@ -13,7 +13,7 @@ pub enum ChatIndexEvent {
// https://github.com/napi-rs/napi-rs/issues/2036
#[crate::export(js_name = "publishToChatIndexStream")]
pub fn publish(
pub async fn publish(
user_id: String,
kind: ChatIndexEvent,
object: &serde_json::Value,
@ -23,4 +23,5 @@ pub fn publish(
Some(kind.to_string()),
Some(serde_json::to_string(object)?),
)
.await
}

View file

@ -18,10 +18,11 @@ pub struct PackedEmoji {
}
#[crate::export(js_name = "publishToBroadcastStream")]
pub fn publish(emoji: &PackedEmoji) -> Result<(), Error> {
pub async fn publish(emoji: &PackedEmoji) -> Result<(), Error> {
publish_to_stream(
&Stream::CustomEmoji,
Some("emojiAdded".to_string()),
Some(format!("{{\"emoji\":{}}}", serde_json::to_string(emoji)?)),
)
.await
}

View file

@ -4,10 +4,15 @@ use crate::service::stream::{chat::ChatEvent, publish_to_stream, Error, Stream};
// https://github.com/napi-rs/napi-rs/issues/2036
#[crate::export(js_name = "publishToGroupChatStream")]
pub fn publish(group_id: String, kind: ChatEvent, object: &serde_json::Value) -> Result<(), Error> {
pub async fn publish(
group_id: String,
kind: ChatEvent,
object: &serde_json::Value,
) -> Result<(), Error> {
publish_to_stream(
&Stream::GroupChat { group_id },
Some(kind.to_string()),
Some(serde_json::to_string(object)?),
)
.await
}

View file

@ -12,10 +12,11 @@ pub struct AbuseUserReportLike {
}
#[crate::export(js_name = "publishToModerationStream")]
pub fn publish(moderator_id: String, report: &AbuseUserReportLike) -> Result<(), Error> {
pub async fn publish(moderator_id: String, report: &AbuseUserReportLike) -> Result<(), Error> {
publish_to_stream(
&Stream::Moderation { moderator_id },
Some("newAbuseUserReport".to_string()),
Some(serde_json::to_string(report)?),
)
.await
}

View file

@ -57,13 +57,15 @@ export async function readUserMessagingMessage(
);
// Publish event
publishToChatStream(otherpartyId, userId, ChatEvent.Read, messageIds);
publishToChatIndexStream(userId, ChatIndexEvent.Read, messageIds);
await Promise.all([
publishToChatStream(otherpartyId, userId, ChatEvent.Read, messageIds),
publishToChatIndexStream(userId, ChatIndexEvent.Read, messageIds),
]);
if (!(await Users.getHasUnreadMessagingMessage(userId))) {
// 全ての(いままで未読だった)自分宛てのメッセージを(これで)読みましたよというイベントを発行
publishMainStream(userId, "readAllMessagingMessages");
sendPushNotification(userId, PushNotificationKind.ReadAllChats, {});
await sendPushNotification(userId, PushNotificationKind.ReadAllChats, {});
} else {
// そのユーザーとのメッセージで未読がなければイベント発行
const hasUnread = await MessagingMessages.exists({
@ -75,9 +77,13 @@ export async function readUserMessagingMessage(
});
if (!hasUnread) {
sendPushNotification(userId, PushNotificationKind.ReadAllChatsInTheRoom, {
userId: otherpartyId,
});
await sendPushNotification(
userId,
PushNotificationKind.ReadAllChatsInTheRoom,
{
userId: otherpartyId,
},
);
}
}
}
@ -127,17 +133,19 @@ export async function readGroupMessagingMessage(
reads.push(message.id);
}
// Publish event
publishToGroupChatStream(groupId, ChatEvent.Read, {
ids: reads,
userId,
});
publishToChatIndexStream(userId, ChatIndexEvent.Read, reads);
// Publish events
await Promise.all([
publishToGroupChatStream(groupId, ChatEvent.Read, {
ids: reads,
userId,
}),
publishToChatIndexStream(userId, ChatIndexEvent.Read, reads),
]);
if (!(await Users.getHasUnreadMessagingMessage(userId))) {
// 全ての(いままで未読だった)自分宛てのメッセージを(これで)読みましたよというイベントを発行
publishMainStream(userId, "readAllMessagingMessages");
sendPushNotification(userId, PushNotificationKind.ReadAllChats, {});
await sendPushNotification(userId, PushNotificationKind.ReadAllChats, {});
} else {
// そのグループにおいて未読がなければイベント発行
const hasUnread = await MessagingMessages.createQueryBuilder("message")
@ -151,9 +159,13 @@ export async function readGroupMessagingMessage(
.then((x) => x != null);
if (!hasUnread) {
sendPushNotification(userId, PushNotificationKind.ReadAllChatsInTheRoom, {
groupId,
});
await sendPushNotification(
userId,
PushNotificationKind.ReadAllChatsInTheRoom,
{
groupId,
},
);
}
}
}

View file

@ -26,8 +26,8 @@ export async function readNotification(
if (result.affected === 0) return;
if (!(await Users.getHasUnreadNotification(userId)))
return postReadAllNotifications(userId);
else return postReadNotifications(userId, notificationIds);
return await postReadAllNotifications(userId);
else return await postReadNotifications(userId, notificationIds);
}
export async function readNotificationByQuery(

View file

@ -77,9 +77,10 @@ export default define(meta, paramDef, async (ps, me) => {
height: size?.height || null,
}).then((x) => Emojis.findOneByOrFail(x.identifiers[0]));
await db.queryResultCache!.remove(["meta_emojis"]);
publishToBroadcastStream(await Emojis.pack(emoji));
await Promise.all([
db.queryResultCache!.remove(["meta_emojis"]),
publishToBroadcastStream(await Emojis.pack(emoji)),
]);
insertModerationLog(me, "addEmoji", {
emojiId: emoji.id,

View file

@ -104,9 +104,10 @@ export default define(meta, paramDef, async (ps, me) => {
height: size?.height ?? null,
}).then((x) => Emojis.findOneByOrFail(x.identifiers[0]));
await db.queryResultCache!.remove(["meta_emojis"]);
publishToBroadcastStream(await Emojis.pack(copied));
await Promise.all([
db.queryResultCache!.remove(["meta_emojis"]),
publishToBroadcastStream(await Emojis.pack(copied)),
]);
return {
id: copied.id,

View file

@ -102,7 +102,7 @@ export default define(meta, paramDef, async (ps, user) => {
acceptAllFollowRequests(user);
}
publishToFollowers(user.id);
await publishToFollowers(user.id);
return iObj;
});

View file

@ -351,11 +351,11 @@ export default define(meta, paramDef, async (ps, _user, token) => {
// 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認
if (user.isLocked && ps.isLocked === false) {
acceptAllFollowRequests(user);
await acceptAllFollowRequests(user);
}
// フォロワーにUpdateを配信
publishToFollowers(user.id);
await publishToFollowers(user.id);
return iObj;
});

View file

@ -31,5 +31,9 @@ export default define(meta, paramDef, async (_, user) => {
// 全ての通知を読みましたよというイベントを発行
publishMainStream(user.id, "readAllNotifications");
sendPushNotification(user.id, PushNotificationKind.ReadAllNotifications, {});
await sendPushNotification(
user.id,
PushNotificationKind.ReadAllNotifications,
{},
);
});

View file

@ -84,8 +84,8 @@ export default define(meta, paramDef, async (ps, me) => {
],
});
for (const moderator of moderators) {
publishToModerationStream(moderator.id, {
for await (const moderator of moderators) {
await publishToModerationStream(moderator.id, {
id: report.id,
targetUserId: report.targetUserId,
reporterId: report.reporterId,

View file

@ -287,10 +287,10 @@ export default class Connection {
// クライアントの事情を考慮したとき、入力フォームはノートチャンネルやメッセージのメインコンポーネントとは別
// なこともあるため、それらのコンポーネントがそれぞれ各チャンネルに接続するようにするのは面倒なため。
case "typingOnChannel":
this.typingOnChannel(body.channel);
await this.typingOnChannel(body.channel);
break;
case "typingOnMessaging":
this.typingOnMessaging(body);
await this.typingOnMessaging(body);
break;
}
}
@ -513,26 +513,30 @@ export default class Connection {
}
}
private typingOnChannel(channelId: ChannelModel["id"]) {
private async typingOnChannel(channelId: ChannelModel["id"]) {
if (this.user) {
publishToChannelStream(channelId, this.user.id);
await publishToChannelStream(channelId, this.user.id);
}
}
private typingOnMessaging(param: {
private async typingOnMessaging(param: {
partner?: User["id"];
group?: UserGroup["id"];
}) {
if (this.user) {
if (param.partner) {
publishToChatStream(
await publishToChatStream(
param.partner,
this.user.id,
ChatEvent.Typing,
this.user.id,
);
} else if (param.group != null) {
publishToGroupChatStream(param.group, ChatEvent.Typing, this.user.id);
await publishToGroupChatStream(
param.group,
ChatEvent.Typing,
this.user.id,
);
}
}
}

View file

@ -85,7 +85,11 @@ export async function createNotification(
if (fresh == null) return; // 既に削除されているかもしれない
// We execute this before, because the server side "read" check doesnt work well with push notifications, the app and service worker will decide themself
// when it is best to show push notifications
sendPushNotification(notifieeId, PushNotificationKind.Generic, packed);
await sendPushNotification(
notifieeId,
PushNotificationKind.Generic,
packed,
);
if (fresh.isRead) return;
//#region ただしミュートしているユーザーからの通知なら無視

View file

@ -55,46 +55,54 @@ export async function createMessage(
if (recipientUser) {
if (Users.isLocalUser(user)) {
// 自分のストリーム
publishToChatStream(
message.userId,
recipientUser.id,
ChatEvent.Message,
messageObj,
);
publishToChatIndexStream(
message.userId,
ChatIndexEvent.Message,
messageObj,
);
// my stream
await Promise.all([
publishToChatStream(
message.userId,
recipientUser.id,
ChatEvent.Message,
messageObj,
),
publishToChatIndexStream(
message.userId,
ChatIndexEvent.Message,
messageObj,
),
]);
publishMainStream(message.userId, "messagingMessage", messageObj);
}
if (Users.isLocalUser(recipientUser)) {
// 相手のストリーム
publishToChatStream(
recipientUser.id,
message.userId,
ChatEvent.Message,
messageObj,
);
publishToChatIndexStream(
recipientUser.id,
ChatIndexEvent.Message,
messageObj,
);
// recipient's stream
await Promise.all([
publishToChatStream(
recipientUser.id,
message.userId,
ChatEvent.Message,
messageObj,
),
publishToChatIndexStream(
recipientUser.id,
ChatIndexEvent.Message,
messageObj,
),
]);
publishMainStream(recipientUser.id, "messagingMessage", messageObj);
}
} else if (recipientGroup != null) {
// group's stream
publishToGroupChatStream(recipientGroup.id, ChatEvent.Message, messageObj);
await publishToGroupChatStream(
recipientGroup.id,
ChatEvent.Message,
messageObj,
);
// member's stream
const joinings = await UserGroupJoinings.findBy({
userGroupId: recipientGroup.id,
});
for (const joining of joinings) {
publishToChatIndexStream(
for await (const joining of joinings) {
await publishToChatIndexStream(
joining.userId,
ChatIndexEvent.Message,
messageObj,
@ -119,7 +127,7 @@ export async function createMessage(
//#endregion
publishMainStream(recipientUser.id, "unreadMessagingMessage", messageObj);
sendPushNotification(
await sendPushNotification(
recipientUser.id,
PushNotificationKind.Chat,
messageObj,
@ -129,10 +137,10 @@ export async function createMessage(
userGroupId: recipientGroup.id,
userId: Not(user.id),
});
for (const joining of joinings) {
for await (const joining of joinings) {
if (freshMessage.reads.includes(joining.userId)) return; // 既読
publishMainStream(joining.userId, "unreadMessagingMessage", messageObj);
sendPushNotification(
await sendPushNotification(
joining.userId,
PushNotificationKind.Chat,
messageObj,

View file

@ -18,18 +18,20 @@ export async function deleteMessage(message: MessagingMessage) {
async function postDeleteMessage(message: MessagingMessage) {
if (message.recipientId) {
const user = await Users.findOneByOrFail({ id: message.userId });
const recipient = await Users.findOneByOrFail({ id: message.recipientId });
const [user, recipient] = await Promise.all([
Users.findOneByOrFail({ id: message.userId }),
Users.findOneByOrFail({ id: message.recipientId }),
]);
if (Users.isLocalUser(user))
publishToChatStream(
await publishToChatStream(
message.userId,
message.recipientId,
ChatEvent.Deleted,
message.id,
);
if (Users.isLocalUser(recipient))
publishToChatStream(
await publishToChatStream(
message.recipientId,
message.userId,
ChatEvent.Deleted,
@ -46,6 +48,10 @@ async function postDeleteMessage(message: MessagingMessage) {
deliver(user, activity, recipient.inbox);
}
} else if (message.groupId != null) {
publishToGroupChatStream(message.groupId, ChatEvent.Deleted, message.id);
await publishToGroupChatStream(
message.groupId,
ChatEvent.Deleted,
message.id,
);
}
}