diff --git a/packages/backend/crates/activitypub/Cargo.toml b/packages/backend/crates/activitypub/Cargo.toml index d8837610e5..8ba9b47a50 100644 --- a/packages/backend/crates/activitypub/Cargo.toml +++ b/packages/backend/crates/activitypub/Cargo.toml @@ -8,17 +8,33 @@ edition = "2021" [dependencies] anyhow = "1.0.71" async-trait = "0.1.68" +base64 = "0.21.0" +bytes = "1.4.0" +chrono = { version = "0.4.24", default-features = false, features = ["clock"] } +derive_builder = "0.12.0" displaydoc = "0.2.4" +dyn-clone = "1.0.11" +enum_delegate = "0.2.0" +futures-core = { version = "0.3.28", default-features = false } +http = "0.2.9" +http-signature-normalization = "0.7.0" +http-signature-normalization-reqwest = { version = "0.8.0", default-features = false, features = [ + "sha-2", + "middleware", +] } +httpdate = "1.0.2" +itertools = "0.10.5" +once_cell = "1.17.1" +openssl = "0.10.52" parse-display = "0.8.0" +pin-project-lite = "0.2.9" +regex = { version = "1.8.1", default-features = false, features = ["std"] } +reqwest = { version = "0.11.17", features = ["json", "stream"] } +reqwest-middleware = "0.2.2" serde = { version = "1.0.163", features = ["derive"] } serde_json = { version = "1.0.96", features = ["preserve_order"] } +sha2 = "0.10.6" thiserror = "1.0.40" +tokio = { version = "1.28.1", features = ["test-util", "macros"] } +tracing = "0.1.37" url = { version = "2.3.1", features = ["serde"] } -derive_builder = "0.12.0" -reqwest = { version = "0.11.17", features = [ - "json", - "stream", - "rustls", - "rustls-native-certs", -] } -reqwest-middleware = "0.2.2" diff --git a/packages/backend/crates/activitypub/src/federation/config.rs b/packages/backend/crates/activitypub/src/federation/config.rs index cd8e70266f..d4090daeeb 100644 --- a/packages/backend/crates/activitypub/src/federation/config.rs +++ b/packages/backend/crates/activitypub/src/federation/config.rs @@ -17,20 +17,17 @@ //! # Ok::<(), anyhow::Error>(()) //! ``` -use crate::{ - activity_queue::create_activity_queue, error::Error, - protocol::verification::verify_domains_match, traits::ActivityHandler, +use crate::federation::{ + error::Error, protocol::verification::verify_domains_match, traits::ActivityHandler, }; use async_trait::async_trait; use derive_builder::Builder; +use dyn_clone::{clone_trait_object, DynClone}; use reqwest_middleware::ClientWithMiddleware; use serde::de::DeserializeOwned; use std::{ ops::Deref, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, - }, + sync::atomic::{AtomicU32, Ordering}, time::Duration, }; use url::Url; @@ -65,16 +62,21 @@ pub struct FederationConfig<T: Clone> { /// use the same as timeout when sending #[builder(default = "Duration::from_secs(10)")] pub(crate) request_timeout: Duration, + /// Function used to verify that urls are valid, See [UrlVerifier] for details. + #[builder(default = "Box::new(DefaultUrlVerifier())")] + pub(crate) url_verifier: Box<dyn UrlVerifier + Sync>, /// Enable to sign HTTP signatures according to draft 10, which does not include (created) and /// (expires) fields. This is required for compatibility with some software like Pleroma. /// <https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-10> /// <https://git.pleroma.social/pleroma/pleroma/-/issues/2939> #[builder(default = "false")] pub(crate) http_signature_compat: bool, - // Queue for sending outgoing activities. Only optional to make builder work, its always - // present once constructed. + /* TODO: store queue handler if necessary + /// Queue for sending outgoing activities. Only optional to make builder work, its always + /// present once constructed. // #[builder(setter(skip))] // pub(crate) activity_queue: Option<Arc<Manager>>, + */ } impl<T: Clone> FederationConfig<T> { @@ -169,23 +171,24 @@ impl<T: Clone> FederationConfigBuilder<T> { /// Constructs a new config instance with the values supplied to builder. /// /// Values which are not explicitly specified use the defaults. Also initializes the - /// queue for outgoing activities, which is stored internally in the config struct. + /// queue for activities, which is stored internally in the config struct. pub fn build(&mut self) -> Result<FederationConfig<T>, FederationConfigBuilderError> { let mut config = self.partial_build()?; - let stats_handler = background_jobs::metrics::install().ok(); - config.queue_metrics = if let Some(stats) = stats_handler { - Some(Arc::new(stats)) - } else { - None - }; - let queue = create_activity_queue( - config.client.clone(), - config.worker_count, - config.request_timeout, - config.debug, - config.queue_db.to_owned(), - ); - config.activity_queue = Some(Arc::new(queue)); + /* TODO: queue initialization here */ + // let stats_handler = background_jobs::metrics::install().ok(); + // config.queue_metrics = if let Some(stats) = stats_handler { + // Some(Arc::new(stats)) + // } else { + // None + // }; + // let queue = create_activity_queue( + // config.client.clone(), + // config.worker_count, + // config.request_timeout, + // config.debug, + // config.queue_db.to_owned(), + // ); + // config.activity_queue = Some(Arc::new(queue)); Ok(config) } } @@ -198,6 +201,59 @@ impl<T: Clone> Deref for FederationConfig<T> { } } +/// Handler for validating URLs. +/// +/// This is used for implementing domain blocklists and similar functionality. It is called +/// with the ID of newly received activities, when fetching remote data from a given URL +/// and before sending an activity to a given inbox URL. If processing for this domain/URL should +/// be aborted, return an error. In case of `Ok(())`, processing continues. +/// +/// ``` +/// # use async_trait::async_trait; +/// # use url::Url; +/// # use activitypub_federation::config::UrlVerifier; +/// # #[derive(Clone)] +/// # struct DatabaseConnection(); +/// # async fn get_blocklist(_: &DatabaseConnection) -> Vec<String> { +/// # vec![] +/// # } +/// #[derive(Clone)] +/// struct Verifier { +/// db_connection: DatabaseConnection, +/// } +/// +/// #[async_trait] +/// impl UrlVerifier for Verifier { +/// async fn verify(&self, url: &Url) -> Result<(), &'static str> { +/// let blocklist = get_blocklist(&self.db_connection).await; +/// let domain = url.domain().unwrap().to_string(); +/// if blocklist.contains(&domain) { +/// Err("Domain is blocked") +/// } else { +/// Ok(()) +/// } +/// } +/// } +/// ``` +#[async_trait] +pub trait UrlVerifier: DynClone + Send { + /// Should return Ok iff the given url is valid for processing. + async fn verify(&self, url: &Url) -> Result<(), &'static str>; +} + +/// Default URL verifier which does nothing. +#[derive(Clone)] +struct DefaultUrlVerifier(); + +#[async_trait] +impl UrlVerifier for DefaultUrlVerifier { + async fn verify(&self, _url: &Url) -> Result<(), &'static str> { + Ok(()) + } +} + +clone_trait_object!(UrlVerifier); + /// Stores data for handling one specific HTTP request. /// /// It gives acess to the `app_data` which was passed to [FederationConfig::builder]. diff --git a/packages/backend/crates/activitypub/src/federation/error.rs b/packages/backend/crates/activitypub/src/federation/error.rs index 5d3dd49c3d..37f0e2251a 100644 --- a/packages/backend/crates/activitypub/src/federation/error.rs +++ b/packages/backend/crates/activitypub/src/federation/error.rs @@ -1,10 +1,13 @@ +// GNU Affero General Public License v3.0 +// https://github.com/LemmyNet/activitypub-federation-rust + //! Error messages returned by this library use displaydoc::Display; /// Error messages returned by this library #[derive(thiserror::Error, Debug, Display)] -pub(crate) enum Error { +pub enum Error { /// Requested object was not found in local database NotFound, /// Request limit was reached during fetch @@ -13,8 +16,8 @@ pub(crate) enum Error { ResponseBodyLimit, /// Object to be fetched was deleted ObjectDeleted, - /// Url in object was invalid - UrlVerificationError, + /// Url in object was invalid: {0} + UrlVerificationError(&'static str), /// Incoming activity has invalid digest for body ActivityBodyDigestInvalid, /// Incoming activity has invalid signature @@ -27,7 +30,7 @@ pub(crate) enum Error { } impl Error { - pub(crate) fn other<T>(error: T) -> Self + pub fn other<T>(error: T) -> Self where T: Into<anyhow::Error>, { diff --git a/packages/backend/crates/activitypub/src/federation/fetch/collection_id.rs b/packages/backend/crates/activitypub/src/federation/fetch/collection_id.rs new file mode 100644 index 0000000000..ed00c18c05 --- /dev/null +++ b/packages/backend/crates/activitypub/src/federation/fetch/collection_id.rs @@ -0,0 +1,106 @@ +// GNU Affero General Public License v3.0 +// https://github.com/LemmyNet/activitypub-federation-rust + +use crate::federation::{ + config::Data, + error::Error, + fetch::fetch_object_http, + traits::{Collection, LocalActor}, +}; +use serde::{Deserialize, Serialize}; +use std::{ + fmt::{Debug, Display, Formatter}, + marker::PhantomData, +}; +use url::Url; + +/// Typed wrapper for Activitypub Collection ID which helps with dereferencing. +#[derive(Serialize, Deserialize)] +#[serde(transparent)] +pub struct CollectionId<Kind>(Box<Url>, PhantomData<Kind>) +where + Kind: Collection, + for<'de2> <Kind as Collection>::Kind: Deserialize<'de2>; + +impl<Kind> CollectionId<Kind> +where + Kind: Collection, + for<'de2> <Kind as Collection>::Kind: Deserialize<'de2>, +{ + /// Construct a new CollectionId instance + pub fn parse<T>(url: T) -> Result<Self, url::ParseError> + where + T: TryInto<Url>, + url::ParseError: From<<T as TryInto<Url>>::Error>, + { + Ok(Self(Box::new(url.try_into()?), PhantomData::<Kind>)) + } + + /// Fetches collection over HTTP + /// + /// Unlike [ObjectId::fetch](crate::fetch::object_id::ObjectId::fetch) this method doesn't do + /// any caching. + pub async fn dereference( + &self, + owner: &<Kind as Collection>::Owner, + local_actor: &impl LocalActor, + data: &Data<<Kind as Collection>::DataType>, + ) -> Result<Kind, <Kind as Collection>::Error> + where + <Kind as Collection>::Error: From<Error>, + { + let json = fetch_object_http(&self.0, local_actor, data).await?; + Kind::verify(&json, &self.0, data).await?; + Kind::from_json(json, owner, data).await + } +} + +/// Need to implement clone manually, to avoid requiring Kind to be Clone +impl<Kind> Clone for CollectionId<Kind> +where + Kind: Collection, + for<'de2> <Kind as Collection>::Kind: serde::Deserialize<'de2>, +{ + fn clone(&self) -> Self { + CollectionId(self.0.clone(), self.1) + } +} + +impl<Kind> Display for CollectionId<Kind> +where + Kind: Collection, + for<'de2> <Kind as Collection>::Kind: serde::Deserialize<'de2>, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0.as_str()) + } +} + +impl<Kind> Debug for CollectionId<Kind> +where + Kind: Collection, + for<'de2> <Kind as Collection>::Kind: serde::Deserialize<'de2>, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0.as_str()) + } +} +impl<Kind> From<CollectionId<Kind>> for Url +where + Kind: Collection, + for<'de2> <Kind as Collection>::Kind: serde::Deserialize<'de2>, +{ + fn from(id: CollectionId<Kind>) -> Self { + *id.0 + } +} + +impl<Kind> From<Url> for CollectionId<Kind> +where + Kind: Collection + Send + 'static, + for<'de2> <Kind as Collection>::Kind: serde::Deserialize<'de2>, +{ + fn from(url: Url) -> Self { + CollectionId(Box::new(url), PhantomData::<Kind>) + } +} diff --git a/packages/backend/crates/activitypub/src/federation/fetch/mod.rs b/packages/backend/crates/activitypub/src/federation/fetch/mod.rs new file mode 100644 index 0000000000..beed005fc6 --- /dev/null +++ b/packages/backend/crates/activitypub/src/federation/fetch/mod.rs @@ -0,0 +1,98 @@ +// GNU Affero General Public License v3.0 +// https://github.com/LemmyNet/activitypub-federation-rust + +//! Utilities for fetching data from other servers + +use crate::federation::{ + config::Data, error::Error, http_signature::sign_request, reqwest_shim::ResponseExt, + traits::LocalActor, FEDERATION_CONTENT_TYPE, +}; +use http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; +use httpdate::fmt_http_date; +use serde::de::DeserializeOwned; +use std::{sync::atomic::Ordering, time::SystemTime}; +use tracing::info; +use url::Url; + +/// Typed wrapper for collection IDs +pub mod collection_id; +/// Typed wrapper for Activitypub Object ID which helps with dereferencing and caching +pub mod object_id; +/// Resolves identifiers of the form `name@example.com` +pub mod webfinger; + +/// Fetch a remote object over HTTP and convert to `Kind`. +/// +/// [crate::fetch::object_id::ObjectId::dereference] wraps this function to add caching and +/// conversion to database type. Only use this function directly in exceptional cases where that +/// behaviour is undesired. +/// +/// Every time an object is fetched via HTTP, [RequestData.request_counter] is incremented by one. +/// If the value exceeds [FederationSettings.http_fetch_limit], the request is aborted with +/// [Error::RequestLimit]. This prevents denial of service attacks where an attack triggers +/// infinite, recursive fetching of data. +pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned>( + url: &Url, + local_actor: &impl LocalActor, + data: &Data<T>, +) -> Result<Kind, Error> { + let config = &data.config; + // dont fetch local objects this way + debug_assert!(url.domain() != Some(&config.domain)); + config.verify_url_valid(url).await?; + info!("Fetching remote object {}", url.to_string()); + + let counter = data.request_counter.fetch_add(1, Ordering::SeqCst); + if counter > config.http_fetch_limit { + return Err(Error::RequestLimit); + } + + let req = config + .client + .get(url.as_str()) + .header("Accept", FEDERATION_CONTENT_TYPE) + .timeout(config.request_timeout); + + let signed_req = sign_request( + req, + local_actor.federation_id(), + String::new(), + LocalActor::private_key_pem(local_actor).to_string(), + false, + ) + .await?; + + let res = config + .client + .execute(signed_req) + .await + .map_err(Error::other)?; + + if res.status() == StatusCode::GONE { + return Err(Error::ObjectDeleted); + } + + res.json_limited().await +} + +pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap { + let mut host = inbox_url.domain().expect("read inbox domain").to_string(); + if let Some(port) = inbox_url.port() { + host = format!("{}:{}", host, port); + } + + let mut headers = HeaderMap::new(); + headers.insert( + HeaderName::from_static("content-type"), + HeaderValue::from_static(FEDERATION_CONTENT_TYPE), + ); + headers.insert( + HeaderName::from_static("host"), + HeaderValue::from_str(&host).expect("Hostname is valid"), + ); + headers.insert( + "date", + HeaderValue::from_str(&fmt_http_date(SystemTime::now())).expect("Date is valid"), + ); + headers +} diff --git a/packages/backend/crates/activitypub/src/federation/fetch/object_id.rs b/packages/backend/crates/activitypub/src/federation/fetch/object_id.rs new file mode 100644 index 0000000000..a3e190747f --- /dev/null +++ b/packages/backend/crates/activitypub/src/federation/fetch/object_id.rs @@ -0,0 +1,282 @@ +// GNU Affero General Public License v3.0 +// https://github.com/LemmyNet/activitypub-federation-rust + +use crate::federation::{ + config::Data, + error::Error, + fetch::fetch_object_http, + traits::{LocalActor, Object}, +}; +use anyhow::anyhow; +use chrono::{Duration as ChronoDuration, NaiveDateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::{ + fmt::{Debug, Display, Formatter}, + marker::PhantomData, + str::FromStr, +}; +use url::Url; + +impl<T> FromStr for ObjectId<T> +where + T: Object + Send + 'static, + for<'de2> <T as Object>::Kind: Deserialize<'de2>, +{ + type Err = url::ParseError; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + ObjectId::parse(s) + } +} +/// Typed wrapper for Activitypub Object ID which helps with dereferencing and caching. +/// +/// It provides convenient methods for fetching the object from remote server or local database. +/// Objects are automatically cached locally, so they don't have to be fetched every time. Much of +/// the crate functionality relies on this wrapper. +/// +/// Every time an object is fetched via HTTP, [RequestData.request_counter] is incremented by one. +/// If the value exceeds [FederationSettings.http_fetch_limit], the request is aborted with +/// [Error::RequestLimit]. This prevents denial of service attacks where an attack triggers +/// infinite, recursive fetching of data. +/// +/// ```rust +/// # use activitypub_federation::fetch::object_id::ObjectId; +/// # use activitypub_federation::config::FederationConfig; +/// # use activitypub_federation::error::Error::NotFound; +/// # use activitypub_federation::traits::tests::{DbConnection, DbUser, DB_LOCAL_USER}; +/// # let _ = actix_rt::System::new(); +/// # actix_rt::Runtime::new().unwrap().block_on(async { +/// # let db_connection = DbConnection; +/// let config = FederationConfig::builder() +/// .domain("example.com") +/// .app_data(db_connection) +/// .build()?; +/// let request_data = config.to_request_data(); +/// let object_id = ObjectId::<DbUser>::parse("https://lemmy.ml/u/nutomic")?; +/// // Attempt to fetch object from local database or fall back to remote server +/// let user = object_id.dereference(&DB_LOCAL_USER.clone(), &request_data).await; +/// assert!(user.is_ok()); +/// // Now you can also read the object from local database without network requests +/// let user = object_id.dereference_local(&request_data).await; +/// assert!(user.is_ok()); +/// # Ok::<(), anyhow::Error>(()) +/// # }).unwrap(); +/// ``` +#[derive(Serialize, Deserialize)] +#[serde(transparent)] +pub struct ObjectId<Kind>(Box<Url>, PhantomData<Kind>) +where + Kind: Object, + for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>; + +impl<Kind> ObjectId<Kind> +where + Kind: Object + Send + 'static, + for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>, +{ + /// Construct a new objectid instance + pub fn parse<T>(url: T) -> Result<Self, url::ParseError> + where + T: TryInto<Url>, + url::ParseError: From<<T as TryInto<Url>>::Error>, + { + Ok(ObjectId(Box::new(url.try_into()?), PhantomData::<Kind>)) + } + + /// Returns a reference to the wrapped URL value + pub fn inner(&self) -> &Url { + &self.0 + } + + /// Returns the wrapped URL value + pub fn into_inner(self) -> Url { + *self.0 + } + + /// Fetches an activitypub object, either from local database (if possible), or over http. + pub async fn dereference( + &self, + local_actor: &impl LocalActor, + data: &Data<<Kind as Object>::DataType>, + ) -> Result<Kind, <Kind as Object>::Error> + where + <Kind as Object>::Error: From<Error> + From<anyhow::Error>, + { + let db_object = self.dereference_from_db(data).await?; + + // if its a local object, only fetch it from the database and not over http + if data.config.is_local_url(&self.0) { + return match db_object { + None => Err(Error::NotFound.into()), + Some(o) => Ok(o), + }; + } + + // object found in database + if let Some(object) = db_object { + // object is old and should be refetched + if let Some(last_refreshed_at) = object.last_refreshed_at() { + if should_refetch_object(last_refreshed_at) { + return self + .dereference_from_http(data, local_actor, Some(object)) + .await; + } + } + Ok(object) + } + // object not found, need to fetch over http + else { + self.dereference_from_http(data, local_actor, None).await + } + } + + /// Fetch an object from the local db. Instead of falling back to http, this throws an error if + /// the object is not found in the database. + pub async fn dereference_local( + &self, + data: &Data<<Kind as Object>::DataType>, + ) -> Result<Kind, <Kind as Object>::Error> + where + <Kind as Object>::Error: From<Error>, + { + let object = self.dereference_from_db(data).await?; + object.ok_or_else(|| Error::NotFound.into()) + } + + /// returning none means the object was not found in local db + async fn dereference_from_db( + &self, + data: &Data<<Kind as Object>::DataType>, + ) -> Result<Option<Kind>, <Kind as Object>::Error> { + let id = self.0.clone(); + Object::read_from_id(*id, data).await + } + + async fn dereference_from_http( + &self, + data: &Data<<Kind as Object>::DataType>, + local_actor: &impl LocalActor, + db_object: Option<Kind>, + ) -> Result<Kind, <Kind as Object>::Error> + where + <Kind as Object>::Error: From<Error> + From<anyhow::Error>, + { + let res = fetch_object_http(&self.0, local_actor, data).await; + + if let Err(Error::ObjectDeleted) = &res { + if let Some(db_object) = db_object { + db_object.delete(data).await?; + } + return Err(anyhow!("Fetched remote object {} which was deleted", self).into()); + } + + let res2 = res?; + + Kind::verify(&res2, self.inner(), data).await?; + Kind::from_json(res2, data).await + } +} + +/// Need to implement clone manually, to avoid requiring Kind to be Clone +impl<Kind> Clone for ObjectId<Kind> +where + Kind: Object, + for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>, +{ + fn clone(&self) -> Self { + ObjectId(self.0.clone(), self.1) + } +} + +static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60; +static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 20; + +/// Determines when a remote actor should be refetched from its instance. In release builds, this is +/// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds +/// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`. +fn should_refetch_object(last_refreshed: NaiveDateTime) -> bool { + let update_interval = if cfg!(debug_assertions) { + // avoid infinite loop when fetching community outbox + ChronoDuration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG) + } else { + ChronoDuration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS) + }; + let refresh_limit = Utc::now().naive_utc() - update_interval; + last_refreshed.lt(&refresh_limit) +} + +impl<Kind> Display for ObjectId<Kind> +where + Kind: Object, + for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0.as_str()) + } +} + +impl<Kind> Debug for ObjectId<Kind> +where + Kind: Object, + for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0.as_str()) + } +} + +impl<Kind> From<ObjectId<Kind>> for Url +where + Kind: Object, + for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>, +{ + fn from(id: ObjectId<Kind>) -> Self { + *id.0 + } +} + +impl<Kind> From<Url> for ObjectId<Kind> +where + Kind: Object + Send + 'static, + for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>, +{ + fn from(url: Url) -> Self { + ObjectId(Box::new(url), PhantomData::<Kind>) + } +} + +impl<Kind> PartialEq for ObjectId<Kind> +where + Kind: Object, + for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>, +{ + fn eq(&self, other: &Self) -> bool { + self.0.eq(&other.0) && self.1 == other.1 + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use crate::federation::{fetch::object_id::should_refetch_object, traits::tests::DbUser}; + + #[test] + fn test_deserialize() { + let id = ObjectId::<DbUser>::parse("http://test.com/").unwrap(); + + let string = serde_json::to_string(&id).unwrap(); + assert_eq!("\"http://test.com/\"", string); + + let parsed: ObjectId<DbUser> = serde_json::from_str(&string).unwrap(); + assert_eq!(parsed, id); + } + + #[test] + fn test_should_refetch_object() { + let one_second_ago = Utc::now().naive_utc() - ChronoDuration::seconds(1); + assert!(!should_refetch_object(one_second_ago)); + + let two_days_ago = Utc::now().naive_utc() - ChronoDuration::days(2); + assert!(should_refetch_object(two_days_ago)); + } +} diff --git a/packages/backend/crates/activitypub/src/federation/fetch/webfinger.rs b/packages/backend/crates/activitypub/src/federation/fetch/webfinger.rs new file mode 100644 index 0000000000..13620378a2 --- /dev/null +++ b/packages/backend/crates/activitypub/src/federation/fetch/webfinger.rs @@ -0,0 +1,227 @@ +// GNU Affero General Public License v3.0 +// https://github.com/LemmyNet/activitypub-federation-rust + +use crate::federation::{ + config::Data, + error::{Error, Error::WebfingerResolveFailed}, + fetch::{fetch_object_http, object_id::ObjectId}, + traits::{Actor, LocalActor, Object}, + FEDERATION_CONTENT_TYPE, +}; +use anyhow::anyhow; +use itertools::Itertools; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use tracing::debug; +use url::Url; + +/// Takes an identifier of the form `name@example.com`, and returns an object of `Kind`. +/// +/// For this the identifier is first resolved via webfinger protocol to an Activitypub ID. This ID +/// is then fetched using [ObjectId::dereference], and the result returned. +/// +/// `instance_actor` will be used to sign fetch request, so it should be a system account with +/// the type of Application or Service. +pub async fn webfinger_resolve_actor<T: Clone, Kind>( + identifier: &str, + instance_actor: &impl LocalActor, + data: &Data<T>, +) -> Result<Kind, <Kind as Object>::Error> +where + Kind: Object + Actor + Send + 'static + Object<DataType = T>, + for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>, + <Kind as Object>::Error: From<crate::federation::error::Error> + + From<anyhow::Error> + + From<url::ParseError> + + Send + + Sync, +{ + let (_, domain) = identifier + .splitn(2, '@') + .collect_tuple() + .ok_or(WebfingerResolveFailed)?; + let protocol = if data.config.debug { "http" } else { "https" }; + let fetch_url = + format!("{protocol}://{domain}/.well-known/webfinger?resource=acct:{identifier}"); + debug!("Fetching webfinger url: {}", &fetch_url); + + let res: Webfinger = fetch_object_http(&Url::parse(&fetch_url)?, instance_actor, data).await?; + + debug_assert_eq!(res.subject, format!("acct:{identifier}")); + let links: Vec<Url> = res + .links + .iter() + .filter(|link| { + if let Some(type_) = &link.kind { + type_.starts_with("application/") + } else { + false + } + }) + .filter_map(|l| l.href.clone()) + .collect(); + for l in links { + let object = ObjectId::<Kind>::from(l) + .dereference(instance_actor, data) + .await; + if object.is_ok() { + return object; + } + } + Err(WebfingerResolveFailed.into()) +} + +/// Extracts username from a webfinger resource parameter. +/// +/// Use this method for your HTTP handler at `.well-known/webfinger` to handle incoming webfinger +/// request. For a parameter of the form `acct:gargron@mastodon.social` it returns `gargron`. +/// +/// Returns an error if query doesn't match local domain. +pub fn extract_webfinger_name<T>(query: &str, data: &Data<T>) -> Result<String, Error> +where + T: Clone, +{ + // TODO: would be nice if we could implement this without regex and remove the dependency + // Regex taken from Mastodon - + // https://github.com/mastodon/mastodon/blob/2b113764117c9ab98875141bcf1758ba8be58173/app/models/account.rb#L65 + let regex = Regex::new(&format!( + "^acct:((?i)[a-z0-9_]+([a-z0-9_\\.-]+[a-z0-9_]+)?)@{}$", + data.domain() + )) + .map_err(Error::other)?; + Ok(regex + .captures(query) + .and_then(|c| c.get(1)) + .ok_or_else(|| Error::other(anyhow!("Webfinger regex failed to match")))? + .as_str() + .to_string()) +} + +/// Builds a basic webfinger response for the actor. +/// +/// It assumes that the given URL is valid both to the view the actor in a browser as HTML, and +/// for fetching it over Activitypub with `activity+json`. This setup is commonly used for ease +/// of discovery. +/// +/// ``` +/// # use url::Url; +/// # use activitypub_federation::fetch::webfinger::build_webfinger_response; +/// let subject = "acct:nutomic@lemmy.ml".to_string(); +/// let url = Url::parse("https://lemmy.ml/u/nutomic")?; +/// build_webfinger_response(subject, url); +/// # Ok::<(), anyhow::Error>(()) +/// ``` +pub fn build_webfinger_response(subject: String, url: Url) -> Webfinger { + build_webfinger_response_with_type(subject, vec![(url, None)]) +} + +/// Builds a webfinger response similar to `build_webfinger_response`. Use this when you want to +/// return multiple actors who share the same namespace and to specify the type of the actor. +/// +/// `urls` takes a vector of tuples. The first item of the tuple is the URL while the second +/// item is the type, such as `"Person"` or `"Group"`. If `None` is passed for the type, the field +/// will be empty. +/// +/// ``` +/// # use url::Url; +/// # use activitypub_federation::fetch::webfinger::build_webfinger_response_with_type; +/// let subject = "acct:nutomic@lemmy.ml".to_string(); +/// let user = Url::parse("https://lemmy.ml/u/nutomic")?; +/// let group = Url::parse("https://lemmy.ml/c/asklemmy")?; +/// build_webfinger_response_with_type(subject, vec![ +/// (user, Some("Person")), +/// (group, Some("Group"))]); +/// # Ok::<(), anyhow::Error>(()) +/// ``` +pub fn build_webfinger_response_with_type( + subject: String, + urls: Vec<(Url, Option<&str>)>, +) -> Webfinger { + Webfinger { + subject, + links: urls.iter().fold(vec![], |mut acc, (url, kind)| { + let properties: HashMap<Url, String> = kind + .map(|kind| { + HashMap::from([( + "https://www.w3.org/ns/activitystreams#type" + .parse() + .expect("parse url"), + kind.to_string(), + )]) + }) + .unwrap_or_default(); + let mut links = vec![ + WebfingerLink { + rel: Some("http://webfinger.net/rel/profile-page".to_string()), + kind: Some("text/html".to_string()), + href: Some(url.clone()), + properties: Default::default(), + }, + WebfingerLink { + rel: Some("self".to_string()), + kind: Some(FEDERATION_CONTENT_TYPE.to_string()), + href: Some(url.clone()), + properties, + }, + ]; + acc.append(&mut links); + acc + }), + aliases: vec![], + properties: Default::default(), + } +} + +/// A webfinger response with information about a `Person` or other type of actor. +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct Webfinger { + /// The actor which is described here, for example `acct:LemmyDev@mastodon.social` + pub subject: String, + /// Links where further data about `subject` can be retrieved + pub links: Vec<WebfingerLink>, + /// Other Urls which identify the same actor as the `subject` + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub aliases: Vec<Url>, + /// Additional data about the subject + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub properties: HashMap<Url, String>, +} + +/// A single link included as part of a [Webfinger] response. +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct WebfingerLink { + /// Relationship of the link, such as `self` or `http://webfinger.net/rel/profile-page` + pub rel: Option<String>, + /// Media type of the target resource + #[serde(rename = "type")] + pub kind: Option<String>, + /// Url pointing to the target resource + pub href: Option<Url>, + /// Additional data about the link + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub properties: HashMap<Url, String>, +} + +// #[cfg(test)] +// mod tests { +// use super::*; +// use crate::{ +// config::FederationConfig, +// traits::tests::{DbConnection, DbUser}, +// }; + +// #[actix_rt::test] +// async fn test_webfinger() { +// let config = FederationConfig::builder() +// .domain("example.com") +// .app_data(DbConnection) +// .build() +// .unwrap(); +// let data = config.to_request_data(); +// let res = +// webfinger_resolve_actor::<DbConnection, DbUser>("LemmyDev@mastodon.social", &data) +// .await; +// assert!(res.is_ok()); +// } +// } diff --git a/packages/backend/crates/activitypub/src/federation/http_signature.rs b/packages/backend/crates/activitypub/src/federation/http_signature.rs new file mode 100644 index 0000000000..df656110a9 --- /dev/null +++ b/packages/backend/crates/activitypub/src/federation/http_signature.rs @@ -0,0 +1,359 @@ +// GNU Affero General Public License v3.0 +// https://github.com/LemmyNet/activitypub-federation-rust + +//! Generating keypairs, creating and verifying signatures +//! +//! Signature creation and verification is handled internally in the library. See +//! [send_activity](crate::activity_queue::send_activity) and +//! [receive_activity (actix-web)](crate::actix_web::inbox::receive_activity) / +//! [receive_activity (axum)](crate::axum::inbox::receive_activity). + +use crate::federation::{ + error::{Error, Error::ActivitySignatureInvalid}, + protocol::public_key::main_key_id, +}; +use base64::{engine::general_purpose::STANDARD as Base64, Engine}; +use http::{header::HeaderName, uri::PathAndQuery, HeaderValue, Method, Uri}; +use http_signature_normalization_reqwest::prelude::{Config, SignExt}; +use once_cell::sync::Lazy; +use openssl::{ + hash::MessageDigest, + pkey::PKey, + rsa::Rsa, + sign::{Signer, Verifier}, +}; +use reqwest::Request; +use reqwest_middleware::RequestBuilder; +use sha2::{Digest, Sha256}; +use std::{collections::BTreeMap, fmt::Debug, io::ErrorKind}; +use tracing::debug; +use url::Url; + +/// A private/public key pair used for HTTP signatures +#[derive(Debug, Clone)] +pub struct Keypair { + /// Private key in PEM format + pub private_key: String, + /// Public key in PEM format + pub public_key: String, +} + +/// Generate a random asymmetric keypair for ActivityPub HTTP signatures. +pub fn generate_actor_keypair() -> Result<Keypair, std::io::Error> { + let rsa = Rsa::generate(2048)?; + let pkey = PKey::from_rsa(rsa)?; + let public_key = pkey.public_key_to_pem()?; + let private_key = pkey.private_key_to_pem_pkcs8()?; + let key_to_string = |key| match String::from_utf8(key) { + Ok(s) => Ok(s), + Err(e) => Err(std::io::Error::new( + ErrorKind::Other, + format!("Failed converting key to string: {}", e), + )), + }; + Ok(Keypair { + private_key: key_to_string(private_key)?, + public_key: key_to_string(public_key)?, + }) +} + +/// Creates an HTTP post request to `inbox_url`, with the given `client` and `headers`, and +/// `activity` as request body. The request is signed with `private_key` and then sent. +pub(crate) async fn sign_request( + request_builder: RequestBuilder, + actor_id: Url, + activity: String, + private_key: String, + http_signature_compat: bool, +) -> Result<Request, anyhow::Error> { + static CONFIG: Lazy<Config> = Lazy::new(Config::new); + static CONFIG_COMPAT: Lazy<Config> = Lazy::new(|| Config::new().mastodon_compat()); + + let key_id = main_key_id(&actor_id); + let sig_conf = match http_signature_compat { + false => CONFIG.clone(), + true => CONFIG_COMPAT.clone(), + }; + request_builder + .signature_with_digest( + sig_conf.clone(), + key_id, + Sha256::new(), + activity, + move |signing_string| { + let private_key = PKey::private_key_from_pem(private_key.as_bytes())?; + let mut signer = Signer::new(MessageDigest::sha256(), &private_key)?; + signer.update(signing_string.as_bytes())?; + + Ok(Base64.encode(signer.sign_to_vec()?)) as Result<_, anyhow::Error> + }, + ) + .await +} + +static CONFIG2: Lazy<http_signature_normalization::Config> = + Lazy::new(http_signature_normalization::Config::new); + +/// Verifies the HTTP signature on an incoming inbox request. +pub(crate) fn verify_signature<'a, H>( + headers: H, + method: &Method, + uri: &Uri, + public_key: &str, +) -> Result<(), Error> +where + H: IntoIterator<Item = (&'a HeaderName, &'a HeaderValue)>, +{ + let mut header_map = BTreeMap::<String, String>::new(); + for (name, value) in headers { + if let Ok(value) = value.to_str() { + header_map.insert(name.to_string(), value.to_string()); + } + } + let path_and_query = uri.path_and_query().map(PathAndQuery::as_str).unwrap_or(""); + + let verified = CONFIG2 + .begin_verify(method.as_str(), path_and_query, header_map) + .map_err(Error::other)? + .verify(|signature, signing_string| -> anyhow::Result<bool> { + debug!( + "Verifying with key {}, message {}", + &public_key, &signing_string + ); + let public_key = PKey::public_key_from_pem(public_key.as_bytes())?; + let mut verifier = Verifier::new(MessageDigest::sha256(), &public_key)?; + verifier.update(signing_string.as_bytes())?; + Ok(verifier.verify(&Base64.decode(signature)?)?) + }) + .map_err(Error::other)?; + + if verified { + debug!("verified signature for {}", uri); + Ok(()) + } else { + Err(ActivitySignatureInvalid) + } +} + +#[derive(Clone, Debug)] +struct DigestPart { + /// We assume that SHA256 is used which is the case with all major fediverse platforms + #[allow(dead_code)] + pub algorithm: String, + /// The hashsum + pub digest: String, +} + +impl DigestPart { + fn try_from_header(h: &HeaderValue) -> Option<Vec<DigestPart>> { + let h = h.to_str().ok()?.split(';').next()?; + let v: Vec<_> = h + .split(',') + .filter_map(|p| { + let mut iter = p.splitn(2, '='); + iter.next() + .and_then(|alg| iter.next().map(|value| (alg, value))) + }) + .map(|(alg, value)| DigestPart { + algorithm: alg.to_owned(), + digest: value.to_owned(), + }) + .collect(); + + if v.is_empty() { + None + } else { + Some(v) + } + } +} + +/// Verify body of an inbox request against the hash provided in `Digest` header. +pub(crate) fn verify_inbox_hash( + digest_header: Option<&HeaderValue>, + body: &[u8], +) -> Result<(), Error> { + let digest = digest_header + .and_then(DigestPart::try_from_header) + .ok_or(Error::ActivityBodyDigestInvalid)?; + let mut hasher = Sha256::new(); + + for part in digest { + hasher.update(body); + if Base64.encode(hasher.finalize_reset()) != part.digest { + return Err(Error::ActivityBodyDigestInvalid); + } + } + + Ok(()) +} + +#[cfg(test)] +pub mod test { + use crate::federation::fetch::generate_request_headers; + + use super::*; + use reqwest::Client; + use reqwest_middleware::ClientWithMiddleware; + use std::str::FromStr; + + static ACTOR_ID: Lazy<Url> = Lazy::new(|| Url::parse("https://example.com/u/alice").unwrap()); + static REMOTE_ID: Lazy<Url> = Lazy::new(|| Url::parse("https://example.com/u/bob").unwrap()); + static INBOX_URL: Lazy<Url> = + Lazy::new(|| Url::parse("https://example.com/u/alice/inbox").unwrap()); + + #[tokio::test] + async fn test_sign() { + let mut headers = generate_request_headers(&INBOX_URL); + // use hardcoded date in order to test against hardcoded signature + headers.insert( + "date", + HeaderValue::from_str("Tue, 28 Mar 2023 21:03:44 GMT").unwrap(), + ); + + let request_builder = ClientWithMiddleware::from(Client::new()) + .post(INBOX_URL.to_string()) + .headers(headers); + let request = sign_request( + request_builder, + ACTOR_ID.clone(), + "my activity".to_string(), + test_keypair().private_key, + // set this to prevent created/expires headers to be generated and inserted + // automatically from current time + true, + ) + .await + .unwrap(); + let signature = request + .headers() + .get("signature") + .unwrap() + .to_str() + .unwrap(); + let expected_signature = concat!( + "keyId=\"https://example.com/u/alice#main-key\",", + "algorithm=\"hs2019\",", + "headers=\"(request-target) content-type date digest host\",", + "signature=\"BpZhHNqzd6d6jhWOxyJ0jXwWWxiKMNK7i3mrr/5mVFnH7fUpicwqw8cSYVr", + "cwWjt0I07HW7rZFUfIdSgCoOEdvxtrccF/hTrwYgm8O6SQRHl1UfFtDR6e9EpfPieVmTjo0", + "QVfyzLLa41rmnz/yFqqer/v0kcdED51/dGe8NCGPBbhgK6C4oh7r+XHsQZMIhh38BcfZVWN", + "YaMqgyhFxu2f34IKnOEk6NjSaNtO+PzQUhbksTvH0Vvi6R0dtQINJFdONVBl4AwDC1INeF5", + "uhQo/SaKHfP3UitUHdM5Pbn+LhZYDB9AaQAW5ZGD43Aw15ecwsnKi4HcjV8nBw4zehlvaQ==\"" + ); + assert_eq!(signature, expected_signature); + } + + #[tokio::test] + async fn test_verify_post() { + let headers = generate_request_headers(&INBOX_URL); + let request_builder = ClientWithMiddleware::from(Client::new()) + .post(INBOX_URL.to_string()) + .headers(headers); + let request = sign_request( + request_builder, + ACTOR_ID.clone(), + "my activity".to_string(), + test_keypair().private_key, + false, + ) + .await + .unwrap(); + + let valid = verify_signature( + request.headers(), + request.method(), + &Uri::from_str(request.url().as_str()).unwrap(), + &test_keypair().public_key, + ); + println!("{:?}", &valid); + assert!(valid.is_ok()); + } + + #[tokio::test] + async fn test_verify_get() { + let headers = generate_request_headers(&REMOTE_ID); + let request_builder = ClientWithMiddleware::from(Client::new()) + .get(REMOTE_ID.to_string()) + .headers(headers); + let request = sign_request( + request_builder, + ACTOR_ID.to_owned(), + String::new(), + test_keypair().private_key, + false, + ) + .await + .unwrap(); + + let valid = verify_signature( + request.headers(), + request.method(), + &Uri::from_str(request.url().as_str()).unwrap(), + &test_keypair().public_key, + ); + println!("{:?}", &valid); + assert!(valid.is_ok()); + } + + #[test] + fn test_verify_inbox_hash_valid() { + let digest_header = + HeaderValue::from_static("SHA-256=lzFT+G7C2hdI5j8M+FuJg1tC+O6AGMVJhooTCKGfbKM="); + let body = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."; + let valid = verify_inbox_hash(Some(&digest_header), body.as_bytes()); + println!("{:?}", &valid); + assert!(valid.is_ok()); + } + + #[test] + fn test_verify_inbox_hash_not_valid() { + let digest_header = + HeaderValue::from_static("SHA-256=Z9h7DJfYWjffXw2XftmWCnpEaK/yqOHKvzCIzIaqgbU="); + let body = "lorem ipsum"; + let invalid = verify_inbox_hash(Some(&digest_header), body.as_bytes()); + assert_eq!(invalid, Err(Error::ActivityBodyDigestInvalid)); + } + + pub fn test_keypair() -> Keypair { + let rsa = Rsa::private_key_from_pem(PRIVATE_KEY.as_bytes()).unwrap(); + let pkey = PKey::from_rsa(rsa).unwrap(); + let private_key = pkey.private_key_to_pem_pkcs8().unwrap(); + let public_key = pkey.public_key_to_pem().unwrap(); + Keypair { + private_key: String::from_utf8(private_key).unwrap(), + public_key: String::from_utf8(public_key).unwrap(), + } + } + + /// Hardcoded private key so that signature doesn't change across runs + const PRIVATE_KEY: &str = concat!( + "-----BEGIN RSA PRIVATE KEY-----\n", + "MIIEogIBAAKCAQEA2kZpsvWYrwM9zMQiDwo4k6/VfpK2aDTeVe9ZkcvDrrWfqt72\n", + "QSjjtXLa8sxJlEn+/zbnZ1lG3AO/WsKs2jiOycNQHBS1ITnSZKEpdKnAoLUn4k16\n", + "YivRmALyLedOfIrvMtQzH8a+kOQ71u2Wa3H9jpkCT5W9OneEBa3VjQp49kcrF3tm\n", + "mrEUhfai5GJM4xrdr587y7exkBF4wObepta9opSeuBkPV4QXZPfgmjwW+oOTheVH\n", + "6L7yjzvjW92j4/T6XKAcu0kn/aQhR8SiGtPBMyOlcW4S2eDHWf1RlqbNGb5L9Qam\n", + "fb0WAymx0ANLUDQyXAu5zViMrd4g8mgdkg7C1wIDAQABAoIBAAHAT0Uvsguz0Frq\n", + "0Li8+A4I4U/RQeqW6f9XtHWpl3NSYuqOPJZY2DxypHRB1Iex13x/gBHH/8jwgShR\n", + "2x/3ev9kmsLu6f+CcdniCFQdFiRaVh/IFI0Ve7cz5tkcoiuSB2NDNcaYFwIdYqfr\n", + "Ytz2OCn2hLQHKB9M9pLMSnDsPmMAOveY11XfhkECrWlh1bx9YPyJScnNKTblB3M+\n", + "GhYL3xzuCxPCC9nUfqz7Y8FnZTCmePOwcRflJDTLFs6Bqkv1PZOZWzI+7akaJxfI\n", + "SOSw3VkGegsdoGVgHobqT2tqL8vuKM1bs47PFwWjVCGEoOvcC/Ha1+INemWbh7VA\n", + "Xa/jvxkCgYEA/+AxeMCLCmH/F696W3RpPdFL25wSYQr1auV2xRfmsT+hhpSp3yz/\n", + "ypkazS9TbnSCm18up+jE9rJ1c9VIZrgcTeKzPURzE68RR8uOsa9o9kaUzfyvRAzb\n", + "fmQXMvv2rmm9U7srhjpvKo1BcHpQIQYToKt0TOv7soSEY2jGNvaK6i0CgYEA2mGL\n", + "sL36WoHF3x2DZNvknLJGjxPSMmdjjfflFRqxKeP+Sf54C4QH/1hxHe/yl/KMBTfa\n", + "woBl05SrwTnQ7bOeR8VTmzP53JfkECT5I9h/g8vT8dkz5WQXWNDgy61Imq/UmWwm\n", + "DHElGrkF31oy5w6+aZ58Sa5bXhBDYpkUP9+pV5MCgYAW5BCo89i8gg3XKZyxp9Vu\n", + "cVXu/KRsSBWyjXq1oTDDNKUXrB8SVy0/C7lpF83H+OZiTf6XiOxuAYMebLtAbUIi\n", + "+Z/9YC1HWocaPCy02rNyLNhNIUjwtpHAWeX1arMj4VPNtNXs+TdOwDpVfKvEeI2y\n", + "9wO9ifMHgnFxj0MEUcQVtQKBgHg2Mhs8uM+RmEbVjDq9AP9w835XPuIYH6lKyIPx\n", + "iYyxwI0i0xojt/NL0BjWuQgDsCg/MuDWpTbvJAzdsrDmqz5+1SMeXXCc/CIW+D5P\n", + "MwJt9WGwWuzvSBrQAK6d2NWt7K335on6zp4DM8RbdqHSb+bcIza8D/ebpDxmX8s5\n", + "Z5KZAoGAX8u+63w1uy1FLhf48SqmjOqkAjdUZCWEmaim69koAOdTIBSSDOnAqzGu\n", + "wIVdLLzI6xTgbYmfErCwpU2v8MfUWr0BDzjQ9G6c5rhcS1BkfxbeAsC42XaVIgCk\n", + "2sMNMqi6f96jbp4IQI70BpecsnBAUa+VoT57bZRvy0lW26w9tYI=\n", + "-----END RSA PRIVATE KEY-----\n" + ); +} diff --git a/packages/backend/crates/activitypub/src/federation/mod.rs b/packages/backend/crates/activitypub/src/federation/mod.rs index 454641e63a..09855bd6b1 100644 --- a/packages/backend/crates/activitypub/src/federation/mod.rs +++ b/packages/backend/crates/activitypub/src/federation/mod.rs @@ -1,3 +1,10 @@ -mod config; -mod error; -mod protocol; +pub(crate) mod config; +pub(crate) mod error; +pub(crate) mod fetch; +pub(crate) mod http_signature; +pub(crate) mod protocol; +pub(crate) mod reqwest_shim; +pub(crate) mod traits; + +/// Mime type for Activitypub data, used for `Accept` and `Content-Type` HTTP headers +pub static FEDERATION_CONTENT_TYPE: &str = "application/activity+json"; diff --git a/packages/backend/crates/activitypub/src/federation/protocol/context.rs b/packages/backend/crates/activitypub/src/federation/protocol/context.rs index 47cbd5a17f..33583cca82 100644 --- a/packages/backend/crates/activitypub/src/federation/protocol/context.rs +++ b/packages/backend/crates/activitypub/src/federation/protocol/context.rs @@ -22,8 +22,9 @@ //! Ok::<(), serde_json::error::Error>(()) //! ``` -use crate::federation::protocol::helper::deserialize_one_or_many; -use crate::{config::Data, traits::ActivityHandler}; +use crate::federation::{ + config::Data, protocol::helper::deserialize_one_or_many, traits::ActivityHandler, +}; use serde::{Deserialize, Serialize}; use serde_json::Value; use url::Url; diff --git a/packages/backend/crates/activitypub/src/federation/protocol/helper.rs b/packages/backend/crates/activitypub/src/federation/protocol/helper.rs index 32abcbda38..65eb64104f 100644 --- a/packages/backend/crates/activitypub/src/federation/protocol/helper.rs +++ b/packages/backend/crates/activitypub/src/federation/protocol/helper.rs @@ -29,7 +29,7 @@ use serde::{Deserialize, Deserializer}; /// ]}"#)?; /// assert_eq!(multiple.to.len(), 2); /// Ok::<(), anyhow::Error>(()) -pub(crate) fn deserialize_one_or_many<'de, T, D>(deserializer: D) -> Result<Vec<T>, D::Error> +pub fn deserialize_one_or_many<'de, T, D>(deserializer: D) -> Result<Vec<T>, D::Error> where T: Deserialize<'de>, D: Deserializer<'de>, @@ -64,7 +64,7 @@ where /// /// let note = serde_json::from_str::<Note>(r#"{"to": ["https://example.com/u/alice"] }"#); /// assert!(note.is_ok()); -pub(crate) fn deserialize_one<'de, T, D>(deserializer: D) -> Result<T, D::Error> +pub fn deserialize_one<'de, T, D>(deserializer: D) -> Result<T, D::Error> where T: Deserialize<'de>, D: Deserializer<'de>, @@ -109,7 +109,7 @@ where /// }"#); /// assert_eq!(note.unwrap().source, None); /// # Ok::<(), anyhow::Error>(()) -pub(crate) fn deserialize_skip_error<'de, T, D>(deserializer: D) -> Result<T, D::Error> +pub fn deserialize_skip_error<'de, T, D>(deserializer: D) -> Result<T, D::Error> where T: Deserialize<'de> + Default, D: Deserializer<'de>, diff --git a/packages/backend/crates/activitypub/src/federation/protocol/mod.rs b/packages/backend/crates/activitypub/src/federation/protocol/mod.rs index c3291abc65..39b6ed35c4 100644 --- a/packages/backend/crates/activitypub/src/federation/protocol/mod.rs +++ b/packages/backend/crates/activitypub/src/federation/protocol/mod.rs @@ -1,5 +1,5 @@ -mod context; -mod helper; -mod kind; -mod public_key; -mod verification; +pub(crate) mod context; +pub(crate) mod helper; +pub(crate) mod kind; +pub(crate) mod public_key; +pub(crate) mod verification; diff --git a/packages/backend/crates/activitypub/src/federation/protocol/public_key.rs b/packages/backend/crates/activitypub/src/federation/protocol/public_key.rs index 9d3f106c94..4408898922 100644 --- a/packages/backend/crates/activitypub/src/federation/protocol/public_key.rs +++ b/packages/backend/crates/activitypub/src/federation/protocol/public_key.rs @@ -11,7 +11,7 @@ use url::Url; /// This needs to be federated in the `public_key` field of all actors. #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] -pub(crate) struct PublicKey { +pub struct PublicKey { /// Id of this private key. pub id: String, /// ID of the actor that this public key belongs to @@ -24,7 +24,7 @@ impl PublicKey { /// Create a new [PublicKey] struct for the `owner` with `public_key_pem`. /// /// It uses an standard key id of `{actor_id}#main-key` - pub(crate) fn new(owner: Url, public_key_pem: String) -> Self { + pub fn new(owner: Url, public_key_pem: String) -> Self { let id = main_key_id(&owner); PublicKey { id, @@ -34,6 +34,6 @@ impl PublicKey { } } -pub(crate) fn main_key_id(owner: &Url) -> String { +pub fn main_key_id(owner: &Url) -> String { format!("{}#main-key", &owner) } diff --git a/packages/backend/crates/activitypub/src/federation/protocol/verification.rs b/packages/backend/crates/activitypub/src/federation/protocol/verification.rs index 50fbca0926..aea5246d79 100644 --- a/packages/backend/crates/activitypub/src/federation/protocol/verification.rs +++ b/packages/backend/crates/activitypub/src/federation/protocol/verification.rs @@ -18,7 +18,7 @@ use url::Url; /// ``` pub fn verify_domains_match(a: &Url, b: &Url) -> Result<(), Error> { if a.domain() != b.domain() { - return Err(Error::UrlVerificationError); + return Err(Error::UrlVerificationError("Domains do not match")); } Ok(()) } @@ -35,7 +35,7 @@ pub fn verify_domains_match(a: &Url, b: &Url) -> Result<(), Error> { /// ``` pub fn verify_urls_match(a: &Url, b: &Url) -> Result<(), Error> { if a != b { - return Err(Error::UrlVerificationError); + return Err(Error::UrlVerificationError("Urls do not match")); } Ok(()) } diff --git a/packages/backend/crates/activitypub/src/federation/reqwest_shim.rs b/packages/backend/crates/activitypub/src/federation/reqwest_shim.rs new file mode 100644 index 0000000000..9b7a13987f --- /dev/null +++ b/packages/backend/crates/activitypub/src/federation/reqwest_shim.rs @@ -0,0 +1,136 @@ +use crate::federation::error::Error; +use bytes::{BufMut, Bytes, BytesMut}; +use futures_core::{ready, stream::BoxStream, Stream}; +use pin_project_lite::pin_project; +use reqwest::Response; +use serde::de::DeserializeOwned; +use std::{ + future::Future, + marker::PhantomData, + mem, + pin::Pin, + task::{Context, Poll}, +}; + +/// 100KB +const MAX_BODY_SIZE: usize = 102400; + +pin_project! { + pub struct BytesFuture { + #[pin] + stream: BoxStream<'static, reqwest::Result<Bytes>>, + limit: usize, + aggregator: BytesMut, + } +} + +impl Future for BytesFuture { + type Output = Result<Bytes, Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + loop { + let this = self.as_mut().project(); + if let Some(chunk) = ready!(this.stream.poll_next(cx)) + .transpose() + .map_err(Error::other)? + { + this.aggregator.put(chunk); + if this.aggregator.len() > *this.limit { + return Poll::Ready(Err(Error::ResponseBodyLimit)); + } + + continue; + } + + break; + } + + Poll::Ready(Ok(mem::take(&mut self.aggregator).freeze())) + } +} + +pin_project! { + pub struct JsonFuture<T> { + _t: PhantomData<T>, + #[pin] + future: BytesFuture, + } +} + +impl<T> Future for JsonFuture<T> +where + T: DeserializeOwned, +{ + type Output = Result<T, Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = self.project(); + let bytes = ready!(this.future.poll(cx))?; + Poll::Ready(serde_json::from_slice(&bytes).map_err(Error::other)) + } +} + +pin_project! { + pub struct TextFuture { + #[pin] + future: BytesFuture, + } +} + +impl Future for TextFuture { + type Output = Result<String, Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = self.project(); + let bytes = ready!(this.future.poll(cx))?; + Poll::Ready(String::from_utf8(bytes.to_vec()).map_err(Error::other)) + } +} + +/// Response shim to work around [an issue in reqwest](https://github.com/seanmonstar/reqwest/issues/1234) (there is an [open pull request](https://github.com/seanmonstar/reqwest/pull/1532) fixing this). +/// +/// Reqwest doesn't limit the response body size by default nor does it offer an option to configure one. +/// Since we have to fetch data from untrusted sources, not restricting the maximum size is a DoS hazard for us. +/// +/// This shim reimplements the `bytes`, `json`, and `text` functions and restricts the bodies to 100KB. +/// +/// TODO: Remove this shim as soon as reqwest gets support for size-limited bodies. +pub trait ResponseExt { + type BytesFuture; + type JsonFuture<T>; + type TextFuture; + + /// Size limited version of `bytes` to work around a reqwest issue. Check [`ResponseExt`] docs for details. + fn bytes_limited(self) -> Self::BytesFuture; + /// Size limited version of `json` to work around a reqwest issue. Check [`ResponseExt`] docs for details. + fn json_limited<T>(self) -> Self::JsonFuture<T>; + /// Size limited version of `text` to work around a reqwest issue. Check [`ResponseExt`] docs for details. + fn text_limited(self) -> Self::TextFuture; +} + +impl ResponseExt for Response { + type BytesFuture = BytesFuture; + type JsonFuture<T> = JsonFuture<T>; + type TextFuture = TextFuture; + + fn bytes_limited(self) -> Self::BytesFuture { + BytesFuture { + stream: Box::pin(self.bytes_stream()), + limit: MAX_BODY_SIZE, + aggregator: BytesMut::new(), + } + } + + fn json_limited<T>(self) -> Self::JsonFuture<T> { + JsonFuture { + _t: PhantomData, + future: self.bytes_limited(), + } + } + + fn text_limited(self) -> Self::TextFuture { + TextFuture { + future: self.bytes_limited(), + } + } +} diff --git a/packages/backend/crates/activitypub/src/federation/traits.rs b/packages/backend/crates/activitypub/src/federation/traits.rs new file mode 100644 index 0000000000..14750e7b4d --- /dev/null +++ b/packages/backend/crates/activitypub/src/federation/traits.rs @@ -0,0 +1,552 @@ +// GNU Affero General Public License v3.0 +// https://github.com/LemmyNet/activitypub-federation-rust + +//! Traits which need to be implemented for federated data types + +use crate::federation::{config::Data, protocol::public_key::PublicKey}; +use async_trait::async_trait; +use chrono::NaiveDateTime; +use serde::Deserialize; +use std::ops::Deref; +use url::Url; + +/// Helper for converting between database structs and federated protocol structs. +/// +/// ``` +/// # use activitystreams_kinds::{object::NoteType, public}; +/// # use chrono::{Local, NaiveDateTime}; +/// # use serde::{Deserialize, Serialize}; +/// # use url::Url; +/// # use activitypub_federation::protocol::{public_key::PublicKey, helpers::deserialize_one_or_many}; +/// # use activitypub_federation::config::Data; +/// # use activitypub_federation::fetch::object_id::ObjectId; +/// # use activitypub_federation::protocol::verification::verify_domains_match; +/// # use activitypub_federation::traits::{Actor, Object}; +/// # use activitypub_federation::traits::tests::{DbConnection, DbUser}; +/// # +/// /// How the post is read/written in the local database +/// pub struct DbPost { +/// pub text: String, +/// pub ap_id: ObjectId<DbPost>, +/// pub creator: ObjectId<DbUser>, +/// pub local: bool, +/// } +/// +/// /// How the post is serialized and represented as Activitypub JSON +/// #[derive(Deserialize, Serialize, Debug)] +/// #[serde(rename_all = "camelCase")] +/// pub struct Note { +/// #[serde(rename = "type")] +/// kind: NoteType, +/// id: ObjectId<DbPost>, +/// pub(crate) attributed_to: ObjectId<DbUser>, +/// #[serde(deserialize_with = "deserialize_one_or_many")] +/// pub(crate) to: Vec<Url>, +/// content: String, +/// } +/// +/// #[async_trait::async_trait] +/// impl Object for DbPost { +/// type DataType = DbConnection; +/// type Kind = Note; +/// type Error = anyhow::Error; +/// +/// async fn read_from_id(object_id: Url, data: &Data<Self::DataType>) -> Result<Option<Self>, Self::Error> { +/// // Attempt to read object from local database. Return Ok(None) if not found. +/// let post: Option<DbPost> = data.read_post_from_json_id(object_id).await?; +/// Ok(post) +/// } +/// +/// async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> { +/// // Called when a local object gets sent out over Activitypub. Simply convert it to the +/// // protocol struct +/// Ok(Note { +/// kind: Default::default(), +/// id: self.ap_id.clone().into(), +/// attributed_to: self.creator, +/// to: vec![public()], +/// content: self.text, +/// }) +/// } +/// +/// async fn verify(json: &Self::Kind, expected_domain: &Url, data: &Data<Self::DataType>,) -> Result<(), Self::Error> { +/// verify_domains_match(json.id.inner(), expected_domain)?; +/// // additional application specific checks +/// Ok(()) +/// } +/// +/// async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> { +/// // Called when a remote object gets received over Activitypub. Validate and insert it +/// // into the database. +/// +/// let post = DbPost { +/// text: json.content, +/// ap_id: json.id, +/// creator: json.attributed_to, +/// local: false, +/// }; +/// +/// // Here we need to persist the object in the local database. Note that Activitypub +/// // doesnt distinguish between creating and updating an object. Thats why we need to +/// // use upsert functionality. +/// data.upsert(&post).await?; +/// +/// Ok(post) +/// } +/// +/// } +#[async_trait] +pub trait Object: Sized { + /// App data type passed to handlers. Must be identical to + /// [crate::config::FederationConfigBuilder::app_data] type. + type DataType: Clone + Send + Sync; + /// The type of protocol struct which gets sent over network to federate this database struct. + type Kind; + /// Error type returned by handler methods + type Error; + + /// Returns the last time this object was updated. + /// + /// If this returns `Some` and the value is too long ago, the object is refetched from the + /// original instance. This should always be implemented for actors, because there is no active + /// update mechanism prescribed. It is possible to send `Update/Person` activities for profile + /// changes, but not all implementations do this, so `last_refreshed_at` is still necessary. + /// + /// The object is refetched if `last_refreshed_at` value is more than 24 hours ago. In debug + /// mode this is reduced to 20 seconds. + fn last_refreshed_at(&self) -> Option<NaiveDateTime> { + None + } + + /// Try to read the object with given `id` from local database. + /// + /// Should return `Ok(None)` if not found. + async fn read_from_id( + object_id: Url, + data: &Data<Self::DataType>, + ) -> Result<Option<Self>, Self::Error>; + + /// Mark remote object as deleted in local database. + /// + /// Called when a `Delete` activity is received, or if fetch returns a `Tombstone` object. + async fn delete(self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { + Ok(()) + } + + /// Convert database type to Activitypub type. + /// + /// Called when a local object gets fetched by another instance over HTTP, or when an object + /// gets sent in an activity. + async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error>; + + /// Verifies that the received object is valid. + /// + /// You should check here that the domain of id matches `expected_domain`. Additionally you + /// should perform any application specific checks. + /// + /// It is necessary to use a separate method for this, because it might be used for activities + /// like `Delete/Note`, which shouldn't perform any database write for the inner `Note`. + async fn verify( + json: &Self::Kind, + expected_domain: &Url, + data: &Data<Self::DataType>, + ) -> Result<(), Self::Error>; + + /// Convert object from ActivityPub type to database type. + /// + /// Called when an object is received from HTTP fetch or as part of an activity. This method + /// should write the received object to database. Note that there is no distinction between + /// create and update, so an `upsert` operation should be used. + async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error>; +} + +/// Handler for receiving incoming activities. +/// +/// ```rust +/// # use activitystreams_kinds::activity::FollowType; +/// # use url::Url; +/// # use activitypub_federation::fetch::object_id::ObjectId; +/// # use activitypub_federation::config::Data; +/// # use activitypub_federation::traits::ActivityHandler; +/// # use activitypub_federation::traits::tests::{DbConnection, DbUser, DB_LOCAL_USER}; +/// #[derive(serde::Deserialize)] +/// struct Follow { +/// actor: ObjectId<DbUser>, +/// object: ObjectId<DbUser>, +/// #[serde(rename = "type")] +/// kind: FollowType, +/// id: Url, +/// } +/// +/// #[async_trait::async_trait] +/// impl ActivityHandler for Follow { +/// type DataType = DbConnection; +/// type Error = anyhow::Error; +/// +/// fn id(&self) -> &Url { +/// &self.id +/// } +/// +/// fn actor(&self) -> &Url { +/// self.actor.inner() +/// } +/// +/// async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { +/// Ok(()) +/// } +/// +/// async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { +/// let local_user = self.object.dereference(&DB_LOCAL_USER.clone(), data).await?; +/// let follower = self.actor.dereference(&DB_LOCAL_USER.clone(), data).await?; +/// data.add_follower(local_user, follower).await?; +/// Ok(()) +/// } +/// } +/// ``` +#[async_trait] +#[enum_delegate::register] +pub trait ActivityHandler { + /// App data type passed to handlers. Must be identical to + /// [crate::config::FederationConfigBuilder::app_data] type. + type DataType: Clone + Send + Sync; + /// Error type returned by handler methods + type Error; + + /// `id` field of the activity + fn id(&self) -> &Url; + + /// `actor` field of activity + fn actor(&self) -> &Url; + + /// Verifies that the received activity is valid. + /// + /// This needs to be a separate method, because it might be used for activities + /// like `Undo/Follow`, which shouldn't perform any database write for the inner `Follow`. + async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error>; + + /// Called when an activity is received. + /// + /// Should perform validation and possibly write action to the database. In case the activity + /// has a nested `object` field, must call `object.from_json` handler. + async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error>; +} + +/// Trait to allow retrieving common Actor data. +pub trait Actor: Object + Send + 'static { + /// `id` field of the actor + fn id(&self) -> Url; + + /// The actor's public key for verifying signatures of incoming activities. + /// + /// Use [generate_actor_keypair](crate::http_signatures::generate_actor_keypair) to create the + /// actor keypair. + fn public_key_pem(&self) -> &str; + + /// The inbox where activities for this user should be sent to + fn inbox(&self) -> Url; + + /// Generates a public key struct for use in the actor json representation + fn public_key(&self) -> PublicKey { + PublicKey::new(self.id(), self.public_key_pem().to_string()) + } + + /// The actor's shared inbox, if any + fn shared_inbox(&self) -> Option<Url> { + None + } + + /// Returns shared inbox if it exists, normal inbox otherwise. + fn shared_inbox_or_inbox(&self) -> Url { + self.shared_inbox().unwrap_or_else(|| self.inbox()) + } +} + +/// Trait to represent a local actor. +pub trait LocalActor { + /// Federation ID (URL) of the actor + fn federation_id(&self) -> Url; + + /// The local actor's private key for signing outgoing activities and requests. + /// + /// Use [generate_actor_keypair](crate::http_signatures::generate_actor_keypair) to create the + /// actor keypair. + fn private_key_pem(&self) -> &str; +} + +/// Allow for boxing of enum variants +#[async_trait] +impl<T> ActivityHandler for Box<T> +where + T: ActivityHandler + Send + Sync, +{ + type DataType = T::DataType; + type Error = T::Error; + + fn id(&self) -> &Url { + self.deref().id() + } + + fn actor(&self) -> &Url { + self.deref().actor() + } + + async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { + self.deref().verify(data).await + } + + async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { + (*self).receive(data).await + } +} + +/// Trait for federating collections +#[async_trait] +pub trait Collection: Sized { + /// Actor or object that this collection belongs to + type Owner; + /// App data type passed to handlers. Must be identical to + /// [crate::config::FederationConfigBuilder::app_data] type. + type DataType: Clone + Send + Sync; + /// The type of protocol struct which gets sent over network to federate this database struct. + type Kind: for<'de2> Deserialize<'de2>; + /// Error type returned by handler methods + type Error; + + /// Reads local collection from database and returns it as Activitypub JSON. + async fn read_local( + owner: &Self::Owner, + data: &Data<Self::DataType>, + ) -> Result<Self::Kind, Self::Error>; + + /// Verifies that the received object is valid. + /// + /// You should check here that the domain of id matches `expected_domain`. Additionally you + /// should perform any application specific checks. + async fn verify( + json: &Self::Kind, + expected_domain: &Url, + data: &Data<Self::DataType>, + ) -> Result<(), Self::Error>; + + /// Convert object from ActivityPub type to database type. + /// + /// Called when an object is received from HTTP fetch or as part of an activity. This method + /// should also write the received object to database. Note that there is no distinction + /// between create and update, so an `upsert` operation should be used. + async fn from_json( + json: Self::Kind, + owner: &Self::Owner, + data: &Data<Self::DataType>, + ) -> Result<Self, Self::Error>; +} + +#[cfg(test)] +pub(crate) mod tests { + use super::*; + use crate::federation::{ + fetch::object_id::ObjectId, + http_signature::{generate_actor_keypair, Keypair}, + protocol::{kind, public_key::PublicKey, verification::verify_domains_match}, + }; + use anyhow::Error; + use once_cell::sync::Lazy; + use serde::{Deserialize, Serialize}; + + #[derive(Clone)] + pub struct DbConnection; + + #[derive(Clone, Debug, Deserialize, Serialize)] + #[serde(rename_all = "camelCase")] + pub(crate) struct PersonActor { + #[serde(rename = "type")] + pub kind: kind::ActorType, + pub preferred_username: String, + pub id: ObjectId<DbUser>, + pub inbox: Url, + pub public_key: PublicKey, + } + #[derive(Debug, Clone)] + pub(crate) struct DbUser { + pub name: String, + pub federation_id: Url, + pub inbox: Url, + pub public_key: String, + #[allow(dead_code)] + private_key: Option<String>, + pub followers: Vec<Url>, + pub local: bool, + } + + #[derive(Debug, Clone)] + pub struct DbLocalUser(DbUser); + + pub static DB_USER_KEYPAIR: Lazy<Keypair> = Lazy::new(|| generate_actor_keypair().unwrap()); + pub static DB_LOCAL_USER_KEYPAIR: Lazy<Keypair> = + Lazy::new(|| generate_actor_keypair().unwrap()); + + pub(crate) static DB_USER: Lazy<DbUser> = Lazy::new(|| DbUser { + name: String::new(), + federation_id: "https://example.com/123".parse().unwrap(), + inbox: "https://example.com/123/inbox".parse().unwrap(), + public_key: DB_USER_KEYPAIR.public_key.clone(), + private_key: None, + followers: vec![], + local: false, + }); + + pub static DB_LOCAL_USER: Lazy<DbLocalUser> = Lazy::new(|| { + DbLocalUser(DbUser { + name: String::new(), + federation_id: "https://localhost/456".parse().unwrap(), + inbox: "https://localhost/456/inbox".parse().unwrap(), + public_key: DB_LOCAL_USER_KEYPAIR.public_key.clone(), + private_key: Some(DB_LOCAL_USER_KEYPAIR.private_key.clone()), + followers: vec![], + local: true, + }) + }); + + #[async_trait] + impl Object for DbUser { + type DataType = DbConnection; + type Kind = PersonActor; + type Error = Error; + + async fn read_from_id( + _object_id: Url, + _data: &Data<Self::DataType>, + ) -> Result<Option<Self>, Self::Error> { + Ok(Some(DB_USER.clone())) + } + + async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> { + Ok(PersonActor { + preferred_username: self.name.clone(), + kind: Default::default(), + id: self.federation_id.clone().into(), + inbox: self.inbox.clone(), + public_key: self.public_key(), + }) + } + + async fn verify( + json: &Self::Kind, + expected_domain: &Url, + _data: &Data<Self::DataType>, + ) -> Result<(), Self::Error> { + verify_domains_match(json.id.inner(), expected_domain)?; + Ok(()) + } + + async fn from_json( + json: Self::Kind, + _data: &Data<Self::DataType>, + ) -> Result<Self, Self::Error> { + Ok(DbUser { + name: json.preferred_username, + federation_id: json.id.into(), + inbox: json.inbox, + public_key: json.public_key.public_key_pem, + private_key: None, + followers: vec![], + local: false, + }) + } + } + + impl Actor for DbUser { + fn id(&self) -> Url { + self.federation_id.clone() + } + + fn public_key_pem(&self) -> &str { + &self.public_key + } + + fn inbox(&self) -> Url { + self.inbox.clone() + } + } + + impl LocalActor for DbLocalUser { + fn federation_id(&self) -> Url { + self.0.federation_id.clone() + } + + fn private_key_pem(&self) -> &str { + &self + .0 + .private_key + .as_ref() + .expect("Local user must have a private key") + } + } + + #[derive(Deserialize, Serialize, Clone, Debug)] + #[serde(rename_all = "camelCase")] + pub(crate) struct Activity { + pub actor: ObjectId<DbUser>, + pub object: ObjectId<DbUser>, + #[serde(rename = "type")] + pub kind: kind::ActivityType, + pub id: Url, + } + + #[async_trait] + impl ActivityHandler for Activity { + type DataType = DbConnection; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _: &Data<Self::DataType>) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { + Ok(()) + } + } + + #[derive(Clone, Debug, Deserialize, Serialize)] + #[serde(rename_all = "camelCase")] + pub struct Note {} + #[derive(Debug, Clone)] + pub struct DbPost {} + + #[async_trait] + impl Object for DbPost { + type DataType = DbConnection; + type Kind = Note; + type Error = Error; + + async fn read_from_id( + _: Url, + _: &Data<Self::DataType>, + ) -> Result<Option<Self>, Self::Error> { + todo!() + } + + async fn into_json(self, _: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> { + todo!() + } + + async fn verify( + _: &Self::Kind, + _: &Url, + _: &Data<Self::DataType>, + ) -> Result<(), Self::Error> { + todo!() + } + + async fn from_json(_: Self::Kind, _: &Data<Self::DataType>) -> Result<Self, Self::Error> { + todo!() + } + } +}