diff --git a/packages/backend/crates/activitypub/Cargo.toml b/packages/backend/crates/activitypub/Cargo.toml index 0cf96652ba..d8837610e5 100644 --- a/packages/backend/crates/activitypub/Cargo.toml +++ b/packages/backend/crates/activitypub/Cargo.toml @@ -14,3 +14,11 @@ serde = { version = "1.0.163", features = ["derive"] } serde_json = { version = "1.0.96", features = ["preserve_order"] } thiserror = "1.0.40" url = { version = "2.3.1", features = ["serde"] } +derive_builder = "0.12.0" +reqwest = { version = "0.11.17", features = [ + "json", + "stream", + "rustls", + "rustls-native-certs", +] } +reqwest-middleware = "0.2.2" diff --git a/packages/backend/crates/activitypub/src/federation/config.rs b/packages/backend/crates/activitypub/src/federation/config.rs index e69de29bb2..cd8e70266f 100644 --- a/packages/backend/crates/activitypub/src/federation/config.rs +++ b/packages/backend/crates/activitypub/src/federation/config.rs @@ -0,0 +1,255 @@ +// GNU Affero General Public License v3.0 +// https://github.com/LemmyNet/activitypub-federation-rust + +//! Configuration for this library, with various federation settings +//! +//! Use [FederationConfig::builder](crate::config::FederationConfig::builder) to initialize it. +//! +//! ``` +//! # use activitypub_federation::config::FederationConfig; +//! # let _ = actix_rt::System::new(); +//! let settings = FederationConfig::builder() +//! .domain("example.com") +//! .app_data(()) +//! .http_fetch_limit(50) +//! .worker_count(16) +//! .build()?; +//! # Ok::<(), anyhow::Error>(()) +//! ``` + +use crate::{ + activity_queue::create_activity_queue, error::Error, + protocol::verification::verify_domains_match, traits::ActivityHandler, +}; +use async_trait::async_trait; +use derive_builder::Builder; +use reqwest_middleware::ClientWithMiddleware; +use serde::de::DeserializeOwned; +use std::{ + ops::Deref, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + time::Duration, +}; +use url::Url; + +/// Configuration for this library, with various federation related settings +#[derive(Builder, Clone)] +#[builder(build_fn(private, name = "partial_build"))] +pub struct FederationConfig { + /// The domain where this federated instance is running + #[builder(setter(into))] + pub(crate) domain: String, + /// Data which the application requires in handlers, such as database connection + /// or configuration. + pub(crate) app_data: T, + /// Maximum number of outgoing HTTP requests per incoming HTTP request. See + /// [crate::fetch::object_id::ObjectId] for more details. + #[builder(default = "20")] + pub(crate) http_fetch_limit: u32, + #[builder(default = "reqwest::Client::default().into()")] + /// HTTP client used for all outgoing requests. Middleware can be used to add functionality + /// like log tracing or retry of failed requests. + pub(crate) client: ClientWithMiddleware, + /// Number of worker threads for sending outgoing activities + #[builder(default = "64")] + pub(crate) worker_count: u64, + /// Run library in debug mode. This allows usage of http and localhost urls. It also sends + /// outgoing activities synchronously, not in background thread. This helps to make tests + /// more consistent. Do not use for production. + #[builder(default = "false")] + pub(crate) debug: bool, + /// Timeout for all HTTP requests. HTTP signatures are valid for 10s, so it makes sense to + /// use the same as timeout when sending + #[builder(default = "Duration::from_secs(10)")] + pub(crate) request_timeout: Duration, + /// Enable to sign HTTP signatures according to draft 10, which does not include (created) and + /// (expires) fields. This is required for compatibility with some software like Pleroma. + /// + /// + #[builder(default = "false")] + pub(crate) http_signature_compat: bool, + // Queue for sending outgoing activities. Only optional to make builder work, its always + // present once constructed. + // #[builder(setter(skip))] + // pub(crate) activity_queue: Option>, +} + +impl FederationConfig { + /// Returns a new config builder with default values. + pub fn builder() -> FederationConfigBuilder { + FederationConfigBuilder::default() + } + + pub(crate) async fn verify_url_and_domain( + &self, + activity: &Activity, + ) -> Result<(), Error> + where + Activity: ActivityHandler + DeserializeOwned + Send + 'static, + { + verify_domains_match(activity.id(), activity.actor())?; + self.verify_url_valid(activity.id()).await?; + if self.is_local_url(activity.id()) { + return Err(Error::UrlVerificationError( + "Activity was sent from local instance", + )); + } + + Ok(()) + } + + /// Create new [Data] from this. You should prefer to use a middleware if possible. + pub fn to_request_data(&self) -> Data { + Data { + config: self.clone(), + request_counter: Default::default(), + } + } + + /// Perform some security checks on URLs as mentioned in activitypub spec, and call user-supplied + /// [`InstanceSettings.verify_url_function`]. + /// + /// https://www.w3.org/TR/activitypub/#security-considerations + pub(crate) async fn verify_url_valid(&self, url: &Url) -> Result<(), Error> { + match url.scheme() { + "https" => {} + "http" => { + if !self.debug { + return Err(Error::UrlVerificationError( + "Http urls are only allowed in debug mode", + )); + } + } + _ => return Err(Error::UrlVerificationError("Invalid url scheme")), + }; + + // Urls which use our local domain are not a security risk, no further verification needed + if self.is_local_url(url) { + return Ok(()); + } + + if url.domain().is_none() { + return Err(Error::UrlVerificationError("Url must have a domain")); + } + + if url.domain() == Some("localhost") && !self.debug { + return Err(Error::UrlVerificationError( + "Localhost is only allowed in debug mode", + )); + } + + self.url_verifier + .verify(url) + .await + .map_err(Error::UrlVerificationError)?; + + Ok(()) + } + + /// Returns true if the url refers to this instance. Handles hostnames like `localhost:8540` for + /// local debugging. + pub(crate) fn is_local_url(&self, url: &Url) -> bool { + let mut domain = url.domain().expect("id has domain").to_string(); + if let Some(port) = url.port() { + domain = format!("{}:{}", domain, port); + } + domain == self.domain + } + + /// Returns the local domain + pub fn domain(&self) -> &str { + &self.domain + } +} + +impl FederationConfigBuilder { + /// Constructs a new config instance with the values supplied to builder. + /// + /// Values which are not explicitly specified use the defaults. Also initializes the + /// queue for outgoing activities, which is stored internally in the config struct. + pub fn build(&mut self) -> Result, FederationConfigBuilderError> { + let mut config = self.partial_build()?; + let stats_handler = background_jobs::metrics::install().ok(); + config.queue_metrics = if let Some(stats) = stats_handler { + Some(Arc::new(stats)) + } else { + None + }; + let queue = create_activity_queue( + config.client.clone(), + config.worker_count, + config.request_timeout, + config.debug, + config.queue_db.to_owned(), + ); + config.activity_queue = Some(Arc::new(queue)); + Ok(config) + } +} + +impl Deref for FederationConfig { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.app_data + } +} + +/// Stores data for handling one specific HTTP request. +/// +/// It gives acess to the `app_data` which was passed to [FederationConfig::builder]. +/// +/// Additionally it contains a counter for outgoing HTTP requests. This is necessary to +/// prevent denial of service attacks, where an attacker triggers fetching of recursive objects. +/// +/// +pub struct Data { + pub(crate) config: FederationConfig, + pub(crate) request_counter: AtomicU32, +} + +impl Data { + /// Returns the data which was stored in [FederationConfigBuilder::app_data] + pub fn app_data(&self) -> &T { + &self.config.app_data + } + + /// The domain that was configured in [FederationConfig]. + pub fn domain(&self) -> &str { + &self.config.domain + } + + /// Returns a new instance of `Data` with request counter set to 0. + pub fn reset_request_count(&self) -> Self { + Data { + config: self.config.clone(), + request_counter: Default::default(), + } + } + /// Total number of outgoing HTTP requests made with this data. + pub fn request_count(&self) -> u32 { + self.request_counter.load(Ordering::Relaxed) + } +} + +impl Deref for Data { + type Target = T; + + fn deref(&self) -> &T { + &self.config.app_data + } +} + +/// Middleware for HTTP handlers which provides access to [Data] +#[derive(Clone)] +pub struct FederationMiddleware(pub(crate) FederationConfig); + +impl FederationMiddleware { + /// Construct a new middleware instance + pub fn new(config: FederationConfig) -> Self { + FederationMiddleware(config) + } +} diff --git a/packages/backend/crates/activitypub/src/federation/mod.rs b/packages/backend/crates/activitypub/src/federation/mod.rs index eddbd04152..454641e63a 100644 --- a/packages/backend/crates/activitypub/src/federation/mod.rs +++ b/packages/backend/crates/activitypub/src/federation/mod.rs @@ -1,2 +1,3 @@ +mod config; mod error; mod protocol;