refactor (backend-rs): use bb8 connection pool for Redis
This commit is contained in:
parent
ab3ca2a20b
commit
abc9d58f7c
5 changed files with 49 additions and 17 deletions
|
@ -1,4 +1,4 @@
|
|||
use crate::database::{redis_conn, redis_key};
|
||||
use crate::database::{redis_conn, redis_key, RedisConnError};
|
||||
use redis::{AsyncCommands, RedisError};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
|
@ -18,11 +18,13 @@ pub enum Category {
|
|||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("Redis error: {0}")]
|
||||
RedisError(#[from] RedisError),
|
||||
RedisErr(#[from] RedisError),
|
||||
#[error("Redis connection error: {0}")]
|
||||
RedisConnErr(#[from] RedisConnError),
|
||||
#[error("Data serialization error: {0}")]
|
||||
SerializeError(#[from] rmp_serde::encode::Error),
|
||||
SerializeErr(#[from] rmp_serde::encode::Error),
|
||||
#[error("Data deserialization error: {0}")]
|
||||
DeserializeError(#[from] rmp_serde::decode::Error),
|
||||
DeserializeErr(#[from] rmp_serde::decode::Error),
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
pub use postgresql::db_conn;
|
||||
pub use redis::key as redis_key;
|
||||
pub use redis::redis_conn;
|
||||
pub use redis::RedisConnError;
|
||||
|
||||
pub mod cache;
|
||||
pub mod postgresql;
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
use crate::config::CONFIG;
|
||||
use async_trait::async_trait;
|
||||
use once_cell::sync::OnceCell;
|
||||
use bb8::{ManageConnection, Pool, PooledConnection, RunError};
|
||||
use redis::{aio::MultiplexedConnection, Client, ErrorKind, IntoConnectionInfo, RedisError};
|
||||
use tokio::sync::OnceCell;
|
||||
|
||||
/// A `bb8::ManageConnection` for `redis::Client::get_async_connection`.
|
||||
/// A `bb8::ManageConnection` for `redis::Client::get_multiplexed_async_connection`.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RedisConnectionManager {
|
||||
client: Client,
|
||||
|
@ -20,7 +21,7 @@ impl RedisConnectionManager {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl bb8::ManageConnection for RedisConnectionManager {
|
||||
impl ManageConnection for RedisConnectionManager {
|
||||
type Connection = MultiplexedConnection;
|
||||
type Error = RedisError;
|
||||
|
||||
|
@ -41,9 +42,9 @@ impl bb8::ManageConnection for RedisConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
static REDIS_CLIENT: OnceCell<Client> = OnceCell::new();
|
||||
static CONN_POOL: OnceCell<Pool<RedisConnectionManager>> = OnceCell::const_new();
|
||||
|
||||
fn init_redis() -> Result<Client, RedisError> {
|
||||
async fn init_conn_pool() -> Result<(), RedisError> {
|
||||
let redis_url = {
|
||||
let mut params = vec!["redis://".to_owned()];
|
||||
|
||||
|
@ -66,16 +67,40 @@ fn init_redis() -> Result<Client, RedisError> {
|
|||
params.concat()
|
||||
};
|
||||
|
||||
tracing::info!("Initializing Redis client");
|
||||
tracing::info!("Initializing connection manager");
|
||||
let manager = RedisConnectionManager::new(redis_url)?;
|
||||
|
||||
Client::open(redis_url)
|
||||
tracing::info!("Creating connection pool");
|
||||
let pool = Pool::builder().build(manager).await?;
|
||||
|
||||
CONN_POOL.get_or_init(|| async { pool }).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn redis_conn() -> Result<MultiplexedConnection, RedisError> {
|
||||
match REDIS_CLIENT.get() {
|
||||
Some(client) => Ok(client.get_multiplexed_async_connection().await?),
|
||||
None => init_redis()?.get_multiplexed_async_connection().await,
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RedisConnError {
|
||||
#[error("Failed to initialize Redis connection pool: {0}")]
|
||||
RedisErr(RedisError),
|
||||
#[error("Redis connection pool error: {0}")]
|
||||
Bb8PoolErr(RunError<RedisError>),
|
||||
}
|
||||
|
||||
pub async fn redis_conn(
|
||||
) -> Result<PooledConnection<'static, RedisConnectionManager>, RedisConnError> {
|
||||
if !CONN_POOL.initialized() {
|
||||
let init_res = init_conn_pool().await;
|
||||
|
||||
if let Err(err) = init_res {
|
||||
return Err(RedisConnError::RedisErr(err));
|
||||
}
|
||||
}
|
||||
|
||||
CONN_POOL
|
||||
.get()
|
||||
.unwrap()
|
||||
.get()
|
||||
.await
|
||||
.map_err(RedisConnError::Bb8PoolErr)
|
||||
}
|
||||
|
||||
/// prefix redis key
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::database::cache;
|
||||
use crate::database::{db_conn, redis_conn, redis_key};
|
||||
use crate::database::{db_conn, redis_conn, redis_key, RedisConnError};
|
||||
use crate::federation::acct::Acct;
|
||||
use crate::misc::check_hit_antenna::{check_hit_antenna, AntennaCheckError};
|
||||
use crate::model::entity::{antenna, note};
|
||||
|
@ -16,6 +16,8 @@ pub enum Error {
|
|||
CacheErr(#[from] cache::Error),
|
||||
#[error("Redis error: {0}")]
|
||||
RedisErr(#[from] RedisError),
|
||||
#[error("Redis connection error: {0}")]
|
||||
RedisConnErr(#[from] RedisConnError),
|
||||
#[error("Invalid ID: {0}")]
|
||||
InvalidIdErr(#[from] InvalidIdErr),
|
||||
#[error("Stream error: {0}")]
|
||||
|
|
|
@ -7,7 +7,7 @@ pub mod group_chat;
|
|||
pub mod moderation;
|
||||
|
||||
use crate::config::CONFIG;
|
||||
use crate::database::redis_conn;
|
||||
use crate::database::{redis_conn, RedisConnError};
|
||||
use redis::{AsyncCommands, RedisError};
|
||||
|
||||
#[derive(strum::Display)]
|
||||
|
@ -49,6 +49,8 @@ pub enum Stream {
|
|||
pub enum Error {
|
||||
#[error("Redis error: {0}")]
|
||||
RedisError(#[from] RedisError),
|
||||
#[error("Redis connection error: {0}")]
|
||||
RedisConnErr(#[from] RedisConnError),
|
||||
#[error("Json (de)serialization error: {0}")]
|
||||
JsonError(#[from] serde_json::Error),
|
||||
#[error("Value error: {0}")]
|
||||
|
|
Loading…
Reference in a new issue