Merge pull request 'rust: porting activitypub-federation-rust' (#10140) from nmkj/calckey:rust/activitypub into refactor/rust

Reviewed-on: https://codeberg.org/calckey/calckey/pulls/10140
This commit is contained in:
Kainoa Kanter 2023-05-18 05:33:46 +00:00
commit 704b599d39
19 changed files with 2727 additions and 0 deletions

View file

@ -0,0 +1,40 @@
[package]
name = "activitypub"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.71"
async-trait = "0.1.68"
base64 = "0.21.0"
bytes = "1.4.0"
chrono = { version = "0.4.24", default-features = false, features = ["clock"] }
derive_builder = "0.12.0"
displaydoc = "0.2.4"
dyn-clone = "1.0.11"
enum_delegate = "0.2.0"
futures-core = { version = "0.3.28", default-features = false }
http = "0.2.9"
http-signature-normalization = "0.7.0"
http-signature-normalization-reqwest = { version = "0.8.0", default-features = false, features = [
"sha-2",
"middleware",
] }
httpdate = "1.0.2"
itertools = "0.10.5"
once_cell = "1.17.1"
openssl = "0.10.52"
parse-display = "0.8.0"
pin-project-lite = "0.2.9"
regex = { version = "1.8.1", default-features = false, features = ["std"] }
reqwest = { version = "0.11.17", features = ["json", "stream"] }
reqwest-middleware = "0.2.2"
serde = { version = "1.0.163", features = ["derive"] }
serde_json = { version = "1.0.96", features = ["preserve_order"] }
sha2 = "0.10.6"
thiserror = "1.0.40"
tokio = { version = "1.28.1", features = ["test-util", "macros"] }
tracing = "0.1.37"
url = { version = "2.3.1", features = ["serde"] }

View file

@ -0,0 +1,47 @@
// GNU Affero General Public License v3.0
// https://github.com/LemmyNet/activitypub-federation-rust
//! Error messages returned by this library
use displaydoc::Display;
/// Error messages returned by this library
#[derive(thiserror::Error, Debug, Display)]
pub enum Error {
/// Requested object was not found in local database
NotFound,
/// Request limit was reached during fetch
RequestLimit,
/// Response body limit was reached during fetch
ResponseBodyLimit,
/// Object to be fetched was deleted
ObjectDeleted,
/// Url in object was invalid: {0}
UrlVerificationError(&'static str),
/// Incoming activity has invalid digest for body
ActivityBodyDigestInvalid,
/// Incoming activity has invalid signature
ActivitySignatureInvalid,
/// Failed to resolve actor via webfinger
WebfingerResolveFailed,
/// Json in request/response was invalid: {0}
JsonError(#[from] serde_json::Error),
/// Other errors which are not explicitly handled
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl Error {
pub fn other<T>(error: T) -> Self
where
T: Into<anyhow::Error>,
{
Error::Other(error.into())
}
}
impl PartialEq for Error {
fn eq(&self, other: &Self) -> bool {
std::mem::discriminant(self) == std::mem::discriminant(other)
}
}

View file

@ -0,0 +1,304 @@
// GNU Affero General Public License v3.0
// https://github.com/LemmyNet/activitypub-federation-rust
//! Configuration for this library, with various federation settings
//!
//! Use [FederationConfig::builder](crate::config::FederationConfig::builder) to initialize it.
//!
//! ```
//! # use activitypub_federation::config::FederationConfig;
//! # let _ = actix_rt::System::new();
//! let settings = FederationConfig::builder()
//! .domain("example.com")
//! .app_data(())
//! .http_fetch_limit(50)
//! .worker_count(16)
//! .build()?;
//! # Ok::<(), anyhow::Error>(())
//! ```
use crate::error::Error;
use crate::federation::{protocol::verification::verify_domains_match, traits::ActivityHandler};
use crate::queue::QueueManager;
use async_trait::async_trait;
use derive_builder::Builder;
use dyn_clone::{clone_trait_object, DynClone};
use reqwest_middleware::ClientWithMiddleware;
use serde::de::DeserializeOwned;
use std::{
ops::Deref,
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};
use url::Url;
/// Configuration for this library, with various federation related settings
#[derive(Builder, Clone)]
#[builder(build_fn(private, name = "partial_build"))]
pub struct FederationConfig<T: Clone> {
/// The domain where this federated instance is running
#[builder(setter(into))]
pub(crate) domain: String,
/// Data which the application requires in handlers, such as database connection
/// or configuration.
pub(crate) app_data: T,
/// Maximum number of outgoing HTTP requests per incoming HTTP request. See
/// [crate::federation::fetch::object_id::ObjectId] for more details.
#[builder(default = "20")]
pub(crate) http_fetch_limit: u32,
#[builder(default = "reqwest::Client::default().into()")]
/// HTTP client used for all outgoing requests. Middleware can be used to add functionality
/// like log tracing or retry of failed requests.
pub(crate) client: ClientWithMiddleware,
/// Run library in debug mode. This allows usage of http and localhost urls. It also sends
/// outgoing activities synchronously, not in background thread. This helps to make tests
/// more consistent. Do not use for production.
#[builder(default = "false")]
pub(crate) debug: bool,
/// Timeout for all HTTP requests. HTTP signatures are valid for 10s, so it makes sense to
/// use the same as timeout when sending
#[builder(default = "Duration::from_secs(10)")]
pub(crate) request_timeout: Duration,
/// Function used to verify that urls are valid, See [UrlVerifier] for details.
#[builder(default = "Box::new(DefaultUrlVerifier())")]
pub(crate) url_verifier: Box<dyn UrlVerifier + Sync>,
/// Enable to sign HTTP signatures according to draft 10, which does not include (created) and
/// (expires) fields. This is required for compatibility with some software like Pleroma.
/// <https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-10>
/// <https://git.pleroma.social/pleroma/pleroma/-/issues/2939>
#[builder(default = "false")]
pub(crate) http_signature_compat: bool,
/// Queue for sending outgoing activities.
pub(crate) queue_manager: Box<dyn QueueManager + Sync>,
}
impl<T: Clone> FederationConfig<T> {
/// Returns a new config builder with default values.
pub fn builder() -> FederationConfigBuilder<T> {
FederationConfigBuilder::default()
}
pub(crate) async fn verify_url_and_domain<Activity, Datatype>(
&self,
activity: &Activity,
) -> Result<(), Error>
where
Activity: ActivityHandler<DataType = Datatype> + DeserializeOwned + Send + 'static,
{
verify_domains_match(activity.id(), activity.actor())?;
self.verify_url_valid(activity.id()).await?;
if self.is_local_url(activity.id()) {
return Err(Error::UrlVerificationError(
"Activity was sent from local instance",
));
}
Ok(())
}
/// Create new [Data] from this. You should prefer to use a middleware if possible.
pub fn to_request_data(&self) -> Data<T> {
Data {
config: self.clone(),
request_counter: Default::default(),
}
}
/// Perform some security checks on URLs as mentioned in activitypub spec, and call user-supplied
/// [`InstanceSettings.verify_url_function`].
///
/// https://www.w3.org/TR/activitypub/#security-considerations
pub(crate) async fn verify_url_valid(&self, url: &Url) -> Result<(), Error> {
match url.scheme() {
"https" => {}
"http" => {
if !self.debug {
return Err(Error::UrlVerificationError(
"Http urls are only allowed in debug mode",
));
}
}
_ => return Err(Error::UrlVerificationError("Invalid url scheme")),
};
// Urls which use our local domain are not a security risk, no further verification needed
if self.is_local_url(url) {
return Ok(());
}
if url.domain().is_none() {
return Err(Error::UrlVerificationError("Url must have a domain"));
}
if url.domain() == Some("localhost") && !self.debug {
return Err(Error::UrlVerificationError(
"Localhost is only allowed in debug mode",
));
}
self.url_verifier
.verify(url)
.await
.map_err(Error::UrlVerificationError)?;
Ok(())
}
/// Returns true if the url refers to this instance. Handles hostnames like `localhost:8540` for
/// local debugging.
pub(crate) fn is_local_url(&self, url: &Url) -> bool {
let mut domain = url.domain().expect("id has domain").to_string();
if let Some(port) = url.port() {
domain = format!("{}:{}", domain, port);
}
domain == self.domain
}
/// Returns the local domain
pub fn domain(&self) -> &str {
&self.domain
}
}
impl<T: Clone> FederationConfigBuilder<T> {
/// Constructs a new config instance with the values supplied to builder.
///
/// Values which are not explicitly specified use the defaults. Also initializes the
/// queue for activities, which is stored internally in the config struct.
pub fn build(&mut self) -> Result<FederationConfig<T>, FederationConfigBuilderError> {
let mut config = self.partial_build()?;
/* TODO: queue initialization here */
// let stats_handler = background_jobs::metrics::install().ok();
// config.queue_metrics = if let Some(stats) = stats_handler {
// Some(Arc::new(stats))
// } else {
// None
// };
// let queue = create_activity_queue(
// config.client.clone(),
// config.worker_count,
// config.request_timeout,
// config.debug,
// config.queue_db.to_owned(),
// );
// config.activity_queue = Some(Arc::new(queue));
Ok(config)
}
}
impl<T: Clone> Deref for FederationConfig<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.app_data
}
}
/// Handler for validating URLs.
///
/// This is used for implementing domain blocklists and similar functionality. It is called
/// with the ID of newly received activities, when fetching remote data from a given URL
/// and before sending an activity to a given inbox URL. If processing for this domain/URL should
/// be aborted, return an error. In case of `Ok(())`, processing continues.
///
/// ```
/// # use async_trait::async_trait;
/// # use url::Url;
/// # use activitypub_federation::config::UrlVerifier;
/// # #[derive(Clone)]
/// # struct DatabaseConnection();
/// # async fn get_blocklist(_: &DatabaseConnection) -> Vec<String> {
/// # vec![]
/// # }
/// #[derive(Clone)]
/// struct Verifier {
/// db_connection: DatabaseConnection,
/// }
///
/// #[async_trait]
/// impl UrlVerifier for Verifier {
/// async fn verify(&self, url: &Url) -> Result<(), &'static str> {
/// let blocklist = get_blocklist(&self.db_connection).await;
/// let domain = url.domain().unwrap().to_string();
/// if blocklist.contains(&domain) {
/// Err("Domain is blocked")
/// } else {
/// Ok(())
/// }
/// }
/// }
/// ```
#[async_trait]
pub trait UrlVerifier: DynClone + Send {
/// Should return Ok iff the given url is valid for processing.
async fn verify(&self, url: &Url) -> Result<(), &'static str>;
}
/// Default URL verifier which does nothing.
#[derive(Clone)]
struct DefaultUrlVerifier();
#[async_trait]
impl UrlVerifier for DefaultUrlVerifier {
async fn verify(&self, _url: &Url) -> Result<(), &'static str> {
Ok(())
}
}
clone_trait_object!(UrlVerifier);
/// Stores data for handling one specific HTTP request.
///
/// It gives acess to the `app_data` which was passed to [FederationConfig::builder].
///
/// Additionally it contains a counter for outgoing HTTP requests. This is necessary to
/// prevent denial of service attacks, where an attacker triggers fetching of recursive objects.
///
/// <https://www.w3.org/TR/activitypub/#security-recursive-objects>
pub struct Data<T: Clone> {
pub(crate) config: FederationConfig<T>,
pub(crate) request_counter: AtomicU32,
}
impl<T: Clone> Data<T> {
/// Returns the data which was stored in [FederationConfigBuilder::app_data]
pub fn app_data(&self) -> &T {
&self.config.app_data
}
/// The domain that was configured in [FederationConfig].
pub fn domain(&self) -> &str {
&self.config.domain
}
/// Returns a new instance of `Data` with request counter set to 0.
pub fn reset_request_count(&self) -> Self {
Data {
config: self.config.clone(),
request_counter: Default::default(),
}
}
/// Total number of outgoing HTTP requests made with this data.
pub fn request_count(&self) -> u32 {
self.request_counter.load(Ordering::Relaxed)
}
}
impl<T: Clone> Deref for Data<T> {
type Target = T;
fn deref(&self) -> &T {
&self.config.app_data
}
}
/// Middleware for HTTP handlers which provides access to [Data]
#[derive(Clone)]
pub struct FederationMiddleware<T: Clone>(pub(crate) FederationConfig<T>);
impl<T: Clone> FederationMiddleware<T> {
/// Construct a new middleware instance
pub fn new(config: FederationConfig<T>) -> Self {
FederationMiddleware(config)
}
}

View file

@ -0,0 +1,106 @@
// GNU Affero General Public License v3.0
// https://github.com/LemmyNet/activitypub-federation-rust
use crate::error::Error;
use crate::federation::{
config::Data,
fetch::fetch_object_http,
traits::{Collection, LocalActor},
};
use serde::{Deserialize, Serialize};
use std::{
fmt::{Debug, Display, Formatter},
marker::PhantomData,
};
use url::Url;
/// Typed wrapper for Activitypub Collection ID which helps with dereferencing.
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
pub struct CollectionId<Kind>(Box<Url>, PhantomData<Kind>)
where
Kind: Collection,
for<'de2> <Kind as Collection>::Kind: Deserialize<'de2>;
impl<Kind> CollectionId<Kind>
where
Kind: Collection,
for<'de2> <Kind as Collection>::Kind: Deserialize<'de2>,
{
/// Construct a new CollectionId instance
pub fn parse<T>(url: T) -> Result<Self, url::ParseError>
where
T: TryInto<Url>,
url::ParseError: From<<T as TryInto<Url>>::Error>,
{
Ok(Self(Box::new(url.try_into()?), PhantomData::<Kind>))
}
/// Fetches collection over HTTP
///
/// Unlike [ObjectId::fetch](crate::fetch::object_id::ObjectId::fetch) this method doesn't do
/// any caching.
pub async fn dereference(
&self,
owner: &<Kind as Collection>::Owner,
local_actor: &impl LocalActor,
data: &Data<<Kind as Collection>::DataType>,
) -> Result<Kind, <Kind as Collection>::Error>
where
<Kind as Collection>::Error: From<Error>,
{
let json = fetch_object_http(&self.0, local_actor, data).await?;
Kind::verify(&json, &self.0, data).await?;
Kind::from_json(json, owner, data).await
}
}
/// Need to implement clone manually, to avoid requiring Kind to be Clone
impl<Kind> Clone for CollectionId<Kind>
where
Kind: Collection,
for<'de2> <Kind as Collection>::Kind: serde::Deserialize<'de2>,
{
fn clone(&self) -> Self {
CollectionId(self.0.clone(), self.1)
}
}
impl<Kind> Display for CollectionId<Kind>
where
Kind: Collection,
for<'de2> <Kind as Collection>::Kind: serde::Deserialize<'de2>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.as_str())
}
}
impl<Kind> Debug for CollectionId<Kind>
where
Kind: Collection,
for<'de2> <Kind as Collection>::Kind: serde::Deserialize<'de2>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.as_str())
}
}
impl<Kind> From<CollectionId<Kind>> for Url
where
Kind: Collection,
for<'de2> <Kind as Collection>::Kind: serde::Deserialize<'de2>,
{
fn from(id: CollectionId<Kind>) -> Self {
*id.0
}
}
impl<Kind> From<Url> for CollectionId<Kind>
where
Kind: Collection + Send + 'static,
for<'de2> <Kind as Collection>::Kind: serde::Deserialize<'de2>,
{
fn from(url: Url) -> Self {
CollectionId(Box::new(url), PhantomData::<Kind>)
}
}

View file

@ -0,0 +1,99 @@
// GNU Affero General Public License v3.0
// https://github.com/LemmyNet/activitypub-federation-rust
//! Utilities for fetching data from other servers
use crate::error::Error;
use crate::federation::{
config::Data, http_signature::sign_request, reqwest_shim::ResponseExt, traits::LocalActor,
FEDERATION_CONTENT_TYPE,
};
use http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
use httpdate::fmt_http_date;
use serde::de::DeserializeOwned;
use std::{sync::atomic::Ordering, time::SystemTime};
use tracing::info;
use url::Url;
/// Typed wrapper for collection IDs
pub mod collection_id;
/// Typed wrapper for Activitypub Object ID which helps with dereferencing and caching
pub mod object_id;
/// Resolves identifiers of the form `name@example.com`
pub mod webfinger;
/// Fetch a remote object over HTTP and convert to `Kind`.
///
/// [crate::fetch::object_id::ObjectId::dereference] wraps this function to add caching and
/// conversion to database type. Only use this function directly in exceptional cases where that
/// behaviour is undesired.
///
/// Every time an object is fetched via HTTP, [RequestData.request_counter] is incremented by one.
/// If the value exceeds [FederationSettings.http_fetch_limit], the request is aborted with
/// [Error::RequestLimit]. This prevents denial of service attacks where an attack triggers
/// infinite, recursive fetching of data.
pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned>(
url: &Url,
local_actor: &impl LocalActor,
data: &Data<T>,
) -> Result<Kind, Error> {
let config = &data.config;
// dont fetch local objects this way
debug_assert!(url.domain() != Some(&config.domain));
config.verify_url_valid(url).await?;
info!("Fetching remote object {}", url.to_string());
let counter = data.request_counter.fetch_add(1, Ordering::SeqCst);
if counter > config.http_fetch_limit {
return Err(Error::RequestLimit);
}
let req = config
.client
.get(url.as_str())
.header("Accept", FEDERATION_CONTENT_TYPE)
.timeout(config.request_timeout);
let signed_req = sign_request(
req,
local_actor.federation_id(),
String::new(),
LocalActor::private_key_pem(local_actor).to_string(),
false,
)
.await?;
let res = config
.client
.execute(signed_req)
.await
.map_err(Error::other)?;
if res.status() == StatusCode::GONE {
return Err(Error::ObjectDeleted);
}
res.json_limited().await
}
pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
let mut host = inbox_url.domain().expect("read inbox domain").to_string();
if let Some(port) = inbox_url.port() {
host = format!("{}:{}", host, port);
}
let mut headers = HeaderMap::new();
headers.insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static(FEDERATION_CONTENT_TYPE),
);
headers.insert(
HeaderName::from_static("host"),
HeaderValue::from_str(&host).expect("Hostname is valid"),
);
headers.insert(
"date",
HeaderValue::from_str(&fmt_http_date(SystemTime::now())).expect("Date is valid"),
);
headers
}

View file

@ -0,0 +1,282 @@
// GNU Affero General Public License v3.0
// https://github.com/LemmyNet/activitypub-federation-rust
use crate::error::Error;
use crate::federation::{
config::Data,
fetch::fetch_object_http,
traits::{LocalActor, Object},
};
use anyhow::anyhow;
use chrono::{Duration as ChronoDuration, NaiveDateTime, Utc};
use serde::{Deserialize, Serialize};
use std::{
fmt::{Debug, Display, Formatter},
marker::PhantomData,
str::FromStr,
};
use url::Url;
impl<T> FromStr for ObjectId<T>
where
T: Object + Send + 'static,
for<'de2> <T as Object>::Kind: Deserialize<'de2>,
{
type Err = url::ParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
ObjectId::parse(s)
}
}
/// Typed wrapper for Activitypub Object ID which helps with dereferencing and caching.
///
/// It provides convenient methods for fetching the object from remote server or local database.
/// Objects are automatically cached locally, so they don't have to be fetched every time. Much of
/// the crate functionality relies on this wrapper.
///
/// Every time an object is fetched via HTTP, [RequestData.request_counter] is incremented by one.
/// If the value exceeds [FederationSettings.http_fetch_limit], the request is aborted with
/// [Error::RequestLimit]. This prevents denial of service attacks where an attack triggers
/// infinite, recursive fetching of data.
///
/// ```rust
/// # use activitypub_federation::fetch::object_id::ObjectId;
/// # use activitypub_federation::config::FederationConfig;
/// # use activitypub_federation::error::Error::NotFound;
/// # use activitypub_federation::traits::tests::{DbConnection, DbUser, DB_LOCAL_USER};
/// # let _ = actix_rt::System::new();
/// # actix_rt::Runtime::new().unwrap().block_on(async {
/// # let db_connection = DbConnection;
/// let config = FederationConfig::builder()
/// .domain("example.com")
/// .app_data(db_connection)
/// .build()?;
/// let request_data = config.to_request_data();
/// let object_id = ObjectId::<DbUser>::parse("https://lemmy.ml/u/nutomic")?;
/// // Attempt to fetch object from local database or fall back to remote server
/// let user = object_id.dereference(&DB_LOCAL_USER.clone(), &request_data).await;
/// assert!(user.is_ok());
/// // Now you can also read the object from local database without network requests
/// let user = object_id.dereference_local(&request_data).await;
/// assert!(user.is_ok());
/// # Ok::<(), anyhow::Error>(())
/// # }).unwrap();
/// ```
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
pub struct ObjectId<Kind>(Box<Url>, PhantomData<Kind>)
where
Kind: Object,
for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>;
impl<Kind> ObjectId<Kind>
where
Kind: Object + Send + 'static,
for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>,
{
/// Construct a new objectid instance
pub fn parse<T>(url: T) -> Result<Self, url::ParseError>
where
T: TryInto<Url>,
url::ParseError: From<<T as TryInto<Url>>::Error>,
{
Ok(ObjectId(Box::new(url.try_into()?), PhantomData::<Kind>))
}
/// Returns a reference to the wrapped URL value
pub fn inner(&self) -> &Url {
&self.0
}
/// Returns the wrapped URL value
pub fn into_inner(self) -> Url {
*self.0
}
/// Fetches an activitypub object, either from local database (if possible), or over http.
pub async fn dereference(
&self,
local_actor: &impl LocalActor,
data: &Data<<Kind as Object>::DataType>,
) -> Result<Kind, <Kind as Object>::Error>
where
<Kind as Object>::Error: From<Error> + From<anyhow::Error>,
{
let db_object = self.dereference_from_db(data).await?;
// if its a local object, only fetch it from the database and not over http
if data.config.is_local_url(&self.0) {
return match db_object {
None => Err(Error::NotFound.into()),
Some(o) => Ok(o),
};
}
// object found in database
if let Some(object) = db_object {
// object is old and should be refetched
if let Some(last_refreshed_at) = object.last_refreshed_at() {
if should_refetch_object(last_refreshed_at) {
return self
.dereference_from_http(data, local_actor, Some(object))
.await;
}
}
Ok(object)
}
// object not found, need to fetch over http
else {
self.dereference_from_http(data, local_actor, None).await
}
}
/// Fetch an object from the local db. Instead of falling back to http, this throws an error if
/// the object is not found in the database.
pub async fn dereference_local(
&self,
data: &Data<<Kind as Object>::DataType>,
) -> Result<Kind, <Kind as Object>::Error>
where
<Kind as Object>::Error: From<Error>,
{
let object = self.dereference_from_db(data).await?;
object.ok_or_else(|| Error::NotFound.into())
}
/// returning none means the object was not found in local db
async fn dereference_from_db(
&self,
data: &Data<<Kind as Object>::DataType>,
) -> Result<Option<Kind>, <Kind as Object>::Error> {
let id = self.0.clone();
Object::read_from_id(*id, data).await
}
async fn dereference_from_http(
&self,
data: &Data<<Kind as Object>::DataType>,
local_actor: &impl LocalActor,
db_object: Option<Kind>,
) -> Result<Kind, <Kind as Object>::Error>
where
<Kind as Object>::Error: From<Error> + From<anyhow::Error>,
{
let res = fetch_object_http(&self.0, local_actor, data).await;
if let Err(Error::ObjectDeleted) = &res {
if let Some(db_object) = db_object {
db_object.delete(data).await?;
}
return Err(anyhow!("Fetched remote object {} which was deleted", self).into());
}
let res2 = res?;
Kind::verify(&res2, self.inner(), data).await?;
Kind::from_json(res2, data).await
}
}
/// Need to implement clone manually, to avoid requiring Kind to be Clone
impl<Kind> Clone for ObjectId<Kind>
where
Kind: Object,
for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>,
{
fn clone(&self) -> Self {
ObjectId(self.0.clone(), self.1)
}
}
static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 20;
/// Determines when a remote actor should be refetched from its instance. In release builds, this is
/// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
/// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
fn should_refetch_object(last_refreshed: NaiveDateTime) -> bool {
let update_interval = if cfg!(debug_assertions) {
// avoid infinite loop when fetching community outbox
ChronoDuration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
} else {
ChronoDuration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
};
let refresh_limit = Utc::now().naive_utc() - update_interval;
last_refreshed.lt(&refresh_limit)
}
impl<Kind> Display for ObjectId<Kind>
where
Kind: Object,
for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.as_str())
}
}
impl<Kind> Debug for ObjectId<Kind>
where
Kind: Object,
for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.as_str())
}
}
impl<Kind> From<ObjectId<Kind>> for Url
where
Kind: Object,
for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>,
{
fn from(id: ObjectId<Kind>) -> Self {
*id.0
}
}
impl<Kind> From<Url> for ObjectId<Kind>
where
Kind: Object + Send + 'static,
for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>,
{
fn from(url: Url) -> Self {
ObjectId(Box::new(url), PhantomData::<Kind>)
}
}
impl<Kind> PartialEq for ObjectId<Kind>
where
Kind: Object,
for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>,
{
fn eq(&self, other: &Self) -> bool {
self.0.eq(&other.0) && self.1 == other.1
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::federation::{fetch::object_id::should_refetch_object, traits::tests::DbUser};
#[test]
fn test_deserialize() {
let id = ObjectId::<DbUser>::parse("http://test.com/").unwrap();
let string = serde_json::to_string(&id).unwrap();
assert_eq!("\"http://test.com/\"", string);
let parsed: ObjectId<DbUser> = serde_json::from_str(&string).unwrap();
assert_eq!(parsed, id);
}
#[test]
fn test_should_refetch_object() {
let one_second_ago = Utc::now().naive_utc() - ChronoDuration::seconds(1);
assert!(!should_refetch_object(one_second_ago));
let two_days_ago = Utc::now().naive_utc() - ChronoDuration::days(2);
assert!(should_refetch_object(two_days_ago));
}
}

View file

@ -0,0 +1,224 @@
// GNU Affero General Public License v3.0
// https://github.com/LemmyNet/activitypub-federation-rust
use crate::error::{Error, Error::WebfingerResolveFailed};
use crate::federation::{
config::Data,
fetch::{fetch_object_http, object_id::ObjectId},
traits::{Actor, LocalActor, Object},
FEDERATION_CONTENT_TYPE,
};
use anyhow::anyhow;
use itertools::Itertools;
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::debug;
use url::Url;
/// Takes an identifier of the form `name@example.com`, and returns an object of `Kind`.
///
/// For this the identifier is first resolved via webfinger protocol to an Activitypub ID. This ID
/// is then fetched using [ObjectId::dereference], and the result returned.
///
/// `instance_actor` will be used to sign fetch request, so it should be a system account with
/// the type of Application or Service.
pub async fn webfinger_resolve_actor<T: Clone, Kind>(
identifier: &str,
instance_actor: &impl LocalActor,
data: &Data<T>,
) -> Result<Kind, <Kind as Object>::Error>
where
Kind: Object + Actor + Send + 'static + Object<DataType = T>,
for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>,
<Kind as Object>::Error:
From<crate::error::Error> + From<anyhow::Error> + From<url::ParseError> + Send + Sync,
{
let (_, domain) = identifier
.splitn(2, '@')
.collect_tuple()
.ok_or(WebfingerResolveFailed)?;
let protocol = if data.config.debug { "http" } else { "https" };
let fetch_url =
format!("{protocol}://{domain}/.well-known/webfinger?resource=acct:{identifier}");
debug!("Fetching webfinger url: {}", &fetch_url);
let res: Webfinger = fetch_object_http(&Url::parse(&fetch_url)?, instance_actor, data).await?;
debug_assert_eq!(res.subject, format!("acct:{identifier}"));
let links: Vec<Url> = res
.links
.iter()
.filter(|link| {
if let Some(type_) = &link.kind {
type_.starts_with("application/")
} else {
false
}
})
.filter_map(|l| l.href.clone())
.collect();
for l in links {
let object = ObjectId::<Kind>::from(l)
.dereference(instance_actor, data)
.await;
if object.is_ok() {
return object;
}
}
Err(WebfingerResolveFailed.into())
}
/// Extracts username from a webfinger resource parameter.
///
/// Use this method for your HTTP handler at `.well-known/webfinger` to handle incoming webfinger
/// request. For a parameter of the form `acct:gargron@mastodon.social` it returns `gargron`.
///
/// Returns an error if query doesn't match local domain.
pub fn extract_webfinger_name<T>(query: &str, data: &Data<T>) -> Result<String, Error>
where
T: Clone,
{
// TODO: would be nice if we could implement this without regex and remove the dependency
// Regex taken from Mastodon -
// https://github.com/mastodon/mastodon/blob/2b113764117c9ab98875141bcf1758ba8be58173/app/models/account.rb#L65
let regex = Regex::new(&format!(
"^acct:((?i)[a-z0-9_]+([a-z0-9_\\.-]+[a-z0-9_]+)?)@{}$",
data.domain()
))
.map_err(Error::other)?;
Ok(regex
.captures(query)
.and_then(|c| c.get(1))
.ok_or_else(|| Error::other(anyhow!("Webfinger regex failed to match")))?
.as_str()
.to_string())
}
/// Builds a basic webfinger response for the actor.
///
/// It assumes that the given URL is valid both to the view the actor in a browser as HTML, and
/// for fetching it over Activitypub with `activity+json`. This setup is commonly used for ease
/// of discovery.
///
/// ```
/// # use url::Url;
/// # use activitypub_federation::fetch::webfinger::build_webfinger_response;
/// let subject = "acct:nutomic@lemmy.ml".to_string();
/// let url = Url::parse("https://lemmy.ml/u/nutomic")?;
/// build_webfinger_response(subject, url);
/// # Ok::<(), anyhow::Error>(())
/// ```
pub fn build_webfinger_response(subject: String, url: Url) -> Webfinger {
build_webfinger_response_with_type(subject, vec![(url, None)])
}
/// Builds a webfinger response similar to `build_webfinger_response`. Use this when you want to
/// return multiple actors who share the same namespace and to specify the type of the actor.
///
/// `urls` takes a vector of tuples. The first item of the tuple is the URL while the second
/// item is the type, such as `"Person"` or `"Group"`. If `None` is passed for the type, the field
/// will be empty.
///
/// ```
/// # use url::Url;
/// # use activitypub_federation::fetch::webfinger::build_webfinger_response_with_type;
/// let subject = "acct:nutomic@lemmy.ml".to_string();
/// let user = Url::parse("https://lemmy.ml/u/nutomic")?;
/// let group = Url::parse("https://lemmy.ml/c/asklemmy")?;
/// build_webfinger_response_with_type(subject, vec![
/// (user, Some("Person")),
/// (group, Some("Group"))]);
/// # Ok::<(), anyhow::Error>(())
/// ```
pub fn build_webfinger_response_with_type(
subject: String,
urls: Vec<(Url, Option<&str>)>,
) -> Webfinger {
Webfinger {
subject,
links: urls.iter().fold(vec![], |mut acc, (url, kind)| {
let properties: HashMap<Url, String> = kind
.map(|kind| {
HashMap::from([(
"https://www.w3.org/ns/activitystreams#type"
.parse()
.expect("parse url"),
kind.to_string(),
)])
})
.unwrap_or_default();
let mut links = vec![
WebfingerLink {
rel: Some("http://webfinger.net/rel/profile-page".to_string()),
kind: Some("text/html".to_string()),
href: Some(url.clone()),
properties: Default::default(),
},
WebfingerLink {
rel: Some("self".to_string()),
kind: Some(FEDERATION_CONTENT_TYPE.to_string()),
href: Some(url.clone()),
properties,
},
];
acc.append(&mut links);
acc
}),
aliases: vec![],
properties: Default::default(),
}
}
/// A webfinger response with information about a `Person` or other type of actor.
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct Webfinger {
/// The actor which is described here, for example `acct:LemmyDev@mastodon.social`
pub subject: String,
/// Links where further data about `subject` can be retrieved
pub links: Vec<WebfingerLink>,
/// Other Urls which identify the same actor as the `subject`
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub aliases: Vec<Url>,
/// Additional data about the subject
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub properties: HashMap<Url, String>,
}
/// A single link included as part of a [Webfinger] response.
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct WebfingerLink {
/// Relationship of the link, such as `self` or `http://webfinger.net/rel/profile-page`
pub rel: Option<String>,
/// Media type of the target resource
#[serde(rename = "type")]
pub kind: Option<String>,
/// Url pointing to the target resource
pub href: Option<Url>,
/// Additional data about the link
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub properties: HashMap<Url, String>,
}
// #[cfg(test)]
// mod tests {
// use super::*;
// use crate::{
// config::FederationConfig,
// traits::tests::{DbConnection, DbUser},
// };
// #[actix_rt::test]
// async fn test_webfinger() {
// let config = FederationConfig::builder()
// .domain("example.com")
// .app_data(DbConnection)
// .build()
// .unwrap();
// let data = config.to_request_data();
// let res =
// webfinger_resolve_actor::<DbConnection, DbUser>("LemmyDev@mastodon.social", &data)
// .await;
// assert!(res.is_ok());
// }
// }

View file

@ -0,0 +1,357 @@
// GNU Affero General Public License v3.0
// https://github.com/LemmyNet/activitypub-federation-rust
//! Generating keypairs, creating and verifying signatures
//!
//! Signature creation and verification is handled internally in the library. See
//! [send_activity](crate::activity_queue::send_activity) and
//! [receive_activity (actix-web)](crate::actix_web::inbox::receive_activity) /
//! [receive_activity (axum)](crate::axum::inbox::receive_activity).
use crate::error::{Error, Error::ActivitySignatureInvalid};
use crate::federation::protocol::public_key::main_key_id;
use base64::{engine::general_purpose::STANDARD as Base64, Engine};
use http::{header::HeaderName, uri::PathAndQuery, HeaderValue, Method, Uri};
use http_signature_normalization_reqwest::prelude::{Config, SignExt};
use once_cell::sync::Lazy;
use openssl::{
hash::MessageDigest,
pkey::PKey,
rsa::Rsa,
sign::{Signer, Verifier},
};
use reqwest::Request;
use reqwest_middleware::RequestBuilder;
use sha2::{Digest, Sha256};
use std::{collections::BTreeMap, fmt::Debug, io::ErrorKind};
use tracing::debug;
use url::Url;
/// A private/public key pair used for HTTP signatures
#[derive(Debug, Clone)]
pub struct Keypair {
/// Private key in PEM format
pub private_key: String,
/// Public key in PEM format
pub public_key: String,
}
/// Generate a random asymmetric keypair for ActivityPub HTTP signatures.
pub fn generate_actor_keypair() -> Result<Keypair, std::io::Error> {
let rsa = Rsa::generate(2048)?;
let pkey = PKey::from_rsa(rsa)?;
let public_key = pkey.public_key_to_pem()?;
let private_key = pkey.private_key_to_pem_pkcs8()?;
let key_to_string = |key| match String::from_utf8(key) {
Ok(s) => Ok(s),
Err(e) => Err(std::io::Error::new(
ErrorKind::Other,
format!("Failed converting key to string: {}", e),
)),
};
Ok(Keypair {
private_key: key_to_string(private_key)?,
public_key: key_to_string(public_key)?,
})
}
/// Creates an HTTP post request to `inbox_url`, with the given `client` and `headers`, and
/// `activity` as request body. The request is signed with `private_key` and then sent.
pub(crate) async fn sign_request(
request_builder: RequestBuilder,
actor_id: Url,
activity: String,
private_key: String,
http_signature_compat: bool,
) -> Result<Request, anyhow::Error> {
static CONFIG: Lazy<Config> = Lazy::new(Config::new);
static CONFIG_COMPAT: Lazy<Config> = Lazy::new(|| Config::new().mastodon_compat());
let key_id = main_key_id(&actor_id);
let sig_conf = match http_signature_compat {
false => CONFIG.clone(),
true => CONFIG_COMPAT.clone(),
};
request_builder
.signature_with_digest(
sig_conf.clone(),
key_id,
Sha256::new(),
activity,
move |signing_string| {
let private_key = PKey::private_key_from_pem(private_key.as_bytes())?;
let mut signer = Signer::new(MessageDigest::sha256(), &private_key)?;
signer.update(signing_string.as_bytes())?;
Ok(Base64.encode(signer.sign_to_vec()?)) as Result<_, anyhow::Error>
},
)
.await
}
static CONFIG2: Lazy<http_signature_normalization::Config> =
Lazy::new(http_signature_normalization::Config::new);
/// Verifies the HTTP signature on an incoming inbox request.
pub(crate) fn verify_signature<'a, H>(
headers: H,
method: &Method,
uri: &Uri,
public_key: &str,
) -> Result<(), Error>
where
H: IntoIterator<Item = (&'a HeaderName, &'a HeaderValue)>,
{
let mut header_map = BTreeMap::<String, String>::new();
for (name, value) in headers {
if let Ok(value) = value.to_str() {
header_map.insert(name.to_string(), value.to_string());
}
}
let path_and_query = uri.path_and_query().map(PathAndQuery::as_str).unwrap_or("");
let verified = CONFIG2
.begin_verify(method.as_str(), path_and_query, header_map)
.map_err(Error::other)?
.verify(|signature, signing_string| -> anyhow::Result<bool> {
debug!(
"Verifying with key {}, message {}",
&public_key, &signing_string
);
let public_key = PKey::public_key_from_pem(public_key.as_bytes())?;
let mut verifier = Verifier::new(MessageDigest::sha256(), &public_key)?;
verifier.update(signing_string.as_bytes())?;
Ok(verifier.verify(&Base64.decode(signature)?)?)
})
.map_err(Error::other)?;
if verified {
debug!("verified signature for {}", uri);
Ok(())
} else {
Err(ActivitySignatureInvalid)
}
}
#[derive(Clone, Debug)]
struct DigestPart {
/// We assume that SHA256 is used which is the case with all major fediverse platforms
#[allow(dead_code)]
pub algorithm: String,
/// The hashsum
pub digest: String,
}
impl DigestPart {
fn try_from_header(h: &HeaderValue) -> Option<Vec<DigestPart>> {
let h = h.to_str().ok()?.split(';').next()?;
let v: Vec<_> = h
.split(',')
.filter_map(|p| {
let mut iter = p.splitn(2, '=');
iter.next()
.and_then(|alg| iter.next().map(|value| (alg, value)))
})
.map(|(alg, value)| DigestPart {
algorithm: alg.to_owned(),
digest: value.to_owned(),
})
.collect();
if v.is_empty() {
None
} else {
Some(v)
}
}
}
/// Verify body of an inbox request against the hash provided in `Digest` header.
pub(crate) fn verify_inbox_hash(
digest_header: Option<&HeaderValue>,
body: &[u8],
) -> Result<(), Error> {
let digest = digest_header
.and_then(DigestPart::try_from_header)
.ok_or(Error::ActivityBodyDigestInvalid)?;
let mut hasher = Sha256::new();
for part in digest {
hasher.update(body);
if Base64.encode(hasher.finalize_reset()) != part.digest {
return Err(Error::ActivityBodyDigestInvalid);
}
}
Ok(())
}
#[cfg(test)]
pub mod test {
use crate::federation::fetch::generate_request_headers;
use super::*;
use reqwest::Client;
use reqwest_middleware::ClientWithMiddleware;
use std::str::FromStr;
static ACTOR_ID: Lazy<Url> = Lazy::new(|| Url::parse("https://example.com/u/alice").unwrap());
static REMOTE_ID: Lazy<Url> = Lazy::new(|| Url::parse("https://example.com/u/bob").unwrap());
static INBOX_URL: Lazy<Url> =
Lazy::new(|| Url::parse("https://example.com/u/alice/inbox").unwrap());
#[tokio::test]
async fn test_sign() {
let mut headers = generate_request_headers(&INBOX_URL);
// use hardcoded date in order to test against hardcoded signature
headers.insert(
"date",
HeaderValue::from_str("Tue, 28 Mar 2023 21:03:44 GMT").unwrap(),
);
let request_builder = ClientWithMiddleware::from(Client::new())
.post(INBOX_URL.to_string())
.headers(headers);
let request = sign_request(
request_builder,
ACTOR_ID.clone(),
"my activity".to_string(),
test_keypair().private_key,
// set this to prevent created/expires headers to be generated and inserted
// automatically from current time
true,
)
.await
.unwrap();
let signature = request
.headers()
.get("signature")
.unwrap()
.to_str()
.unwrap();
let expected_signature = concat!(
"keyId=\"https://example.com/u/alice#main-key\",",
"algorithm=\"hs2019\",",
"headers=\"(request-target) content-type date digest host\",",
"signature=\"BpZhHNqzd6d6jhWOxyJ0jXwWWxiKMNK7i3mrr/5mVFnH7fUpicwqw8cSYVr",
"cwWjt0I07HW7rZFUfIdSgCoOEdvxtrccF/hTrwYgm8O6SQRHl1UfFtDR6e9EpfPieVmTjo0",
"QVfyzLLa41rmnz/yFqqer/v0kcdED51/dGe8NCGPBbhgK6C4oh7r+XHsQZMIhh38BcfZVWN",
"YaMqgyhFxu2f34IKnOEk6NjSaNtO+PzQUhbksTvH0Vvi6R0dtQINJFdONVBl4AwDC1INeF5",
"uhQo/SaKHfP3UitUHdM5Pbn+LhZYDB9AaQAW5ZGD43Aw15ecwsnKi4HcjV8nBw4zehlvaQ==\""
);
assert_eq!(signature, expected_signature);
}
#[tokio::test]
async fn test_verify_post() {
let headers = generate_request_headers(&INBOX_URL);
let request_builder = ClientWithMiddleware::from(Client::new())
.post(INBOX_URL.to_string())
.headers(headers);
let request = sign_request(
request_builder,
ACTOR_ID.clone(),
"my activity".to_string(),
test_keypair().private_key,
false,
)
.await
.unwrap();
let valid = verify_signature(
request.headers(),
request.method(),
&Uri::from_str(request.url().as_str()).unwrap(),
&test_keypair().public_key,
);
println!("{:?}", &valid);
assert!(valid.is_ok());
}
#[tokio::test]
async fn test_verify_get() {
let headers = generate_request_headers(&REMOTE_ID);
let request_builder = ClientWithMiddleware::from(Client::new())
.get(REMOTE_ID.to_string())
.headers(headers);
let request = sign_request(
request_builder,
ACTOR_ID.to_owned(),
String::new(),
test_keypair().private_key,
false,
)
.await
.unwrap();
let valid = verify_signature(
request.headers(),
request.method(),
&Uri::from_str(request.url().as_str()).unwrap(),
&test_keypair().public_key,
);
println!("{:?}", &valid);
assert!(valid.is_ok());
}
#[test]
fn test_verify_inbox_hash_valid() {
let digest_header =
HeaderValue::from_static("SHA-256=lzFT+G7C2hdI5j8M+FuJg1tC+O6AGMVJhooTCKGfbKM=");
let body = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.";
let valid = verify_inbox_hash(Some(&digest_header), body.as_bytes());
println!("{:?}", &valid);
assert!(valid.is_ok());
}
#[test]
fn test_verify_inbox_hash_not_valid() {
let digest_header =
HeaderValue::from_static("SHA-256=Z9h7DJfYWjffXw2XftmWCnpEaK/yqOHKvzCIzIaqgbU=");
let body = "lorem ipsum";
let invalid = verify_inbox_hash(Some(&digest_header), body.as_bytes());
assert_eq!(invalid, Err(Error::ActivityBodyDigestInvalid));
}
pub fn test_keypair() -> Keypair {
let rsa = Rsa::private_key_from_pem(PRIVATE_KEY.as_bytes()).unwrap();
let pkey = PKey::from_rsa(rsa).unwrap();
let private_key = pkey.private_key_to_pem_pkcs8().unwrap();
let public_key = pkey.public_key_to_pem().unwrap();
Keypair {
private_key: String::from_utf8(private_key).unwrap(),
public_key: String::from_utf8(public_key).unwrap(),
}
}
/// Hardcoded private key so that signature doesn't change across runs
const PRIVATE_KEY: &str = concat!(
"-----BEGIN RSA PRIVATE KEY-----\n",
"MIIEogIBAAKCAQEA2kZpsvWYrwM9zMQiDwo4k6/VfpK2aDTeVe9ZkcvDrrWfqt72\n",
"QSjjtXLa8sxJlEn+/zbnZ1lG3AO/WsKs2jiOycNQHBS1ITnSZKEpdKnAoLUn4k16\n",
"YivRmALyLedOfIrvMtQzH8a+kOQ71u2Wa3H9jpkCT5W9OneEBa3VjQp49kcrF3tm\n",
"mrEUhfai5GJM4xrdr587y7exkBF4wObepta9opSeuBkPV4QXZPfgmjwW+oOTheVH\n",
"6L7yjzvjW92j4/T6XKAcu0kn/aQhR8SiGtPBMyOlcW4S2eDHWf1RlqbNGb5L9Qam\n",
"fb0WAymx0ANLUDQyXAu5zViMrd4g8mgdkg7C1wIDAQABAoIBAAHAT0Uvsguz0Frq\n",
"0Li8+A4I4U/RQeqW6f9XtHWpl3NSYuqOPJZY2DxypHRB1Iex13x/gBHH/8jwgShR\n",
"2x/3ev9kmsLu6f+CcdniCFQdFiRaVh/IFI0Ve7cz5tkcoiuSB2NDNcaYFwIdYqfr\n",
"Ytz2OCn2hLQHKB9M9pLMSnDsPmMAOveY11XfhkECrWlh1bx9YPyJScnNKTblB3M+\n",
"GhYL3xzuCxPCC9nUfqz7Y8FnZTCmePOwcRflJDTLFs6Bqkv1PZOZWzI+7akaJxfI\n",
"SOSw3VkGegsdoGVgHobqT2tqL8vuKM1bs47PFwWjVCGEoOvcC/Ha1+INemWbh7VA\n",
"Xa/jvxkCgYEA/+AxeMCLCmH/F696W3RpPdFL25wSYQr1auV2xRfmsT+hhpSp3yz/\n",
"ypkazS9TbnSCm18up+jE9rJ1c9VIZrgcTeKzPURzE68RR8uOsa9o9kaUzfyvRAzb\n",
"fmQXMvv2rmm9U7srhjpvKo1BcHpQIQYToKt0TOv7soSEY2jGNvaK6i0CgYEA2mGL\n",
"sL36WoHF3x2DZNvknLJGjxPSMmdjjfflFRqxKeP+Sf54C4QH/1hxHe/yl/KMBTfa\n",
"woBl05SrwTnQ7bOeR8VTmzP53JfkECT5I9h/g8vT8dkz5WQXWNDgy61Imq/UmWwm\n",
"DHElGrkF31oy5w6+aZ58Sa5bXhBDYpkUP9+pV5MCgYAW5BCo89i8gg3XKZyxp9Vu\n",
"cVXu/KRsSBWyjXq1oTDDNKUXrB8SVy0/C7lpF83H+OZiTf6XiOxuAYMebLtAbUIi\n",
"+Z/9YC1HWocaPCy02rNyLNhNIUjwtpHAWeX1arMj4VPNtNXs+TdOwDpVfKvEeI2y\n",
"9wO9ifMHgnFxj0MEUcQVtQKBgHg2Mhs8uM+RmEbVjDq9AP9w835XPuIYH6lKyIPx\n",
"iYyxwI0i0xojt/NL0BjWuQgDsCg/MuDWpTbvJAzdsrDmqz5+1SMeXXCc/CIW+D5P\n",
"MwJt9WGwWuzvSBrQAK6d2NWt7K335on6zp4DM8RbdqHSb+bcIza8D/ebpDxmX8s5\n",
"Z5KZAoGAX8u+63w1uy1FLhf48SqmjOqkAjdUZCWEmaim69koAOdTIBSSDOnAqzGu\n",
"wIVdLLzI6xTgbYmfErCwpU2v8MfUWr0BDzjQ9G6c5rhcS1BkfxbeAsC42XaVIgCk\n",
"2sMNMqi6f96jbp4IQI70BpecsnBAUa+VoT57bZRvy0lW26w9tYI=\n",
"-----END RSA PRIVATE KEY-----\n"
);
}

View file

@ -0,0 +1,9 @@
pub mod config;
pub mod fetch;
pub mod http_signature;
pub mod protocol;
pub mod reqwest_shim;
pub mod traits;
/// Mime type for Activitypub data, used for `Accept` and `Content-Type` HTTP headers
pub static FEDERATION_CONTENT_TYPE: &str = "application/activity+json";

View file

@ -0,0 +1,98 @@
// GNU Affero General Public License v3.0
// https://github.com/LemmyNet/activitypub-federation-rust
//! Wrapper for federated structs which handles `@context` field.
//!
//! This wrapper can be used when sending Activitypub data, to automatically add `@context`. It
//! avoids having to repeat the `@context` property on every struct, and getting multiple contexts
//! in nested structs.
//!
//! ```
//! # use activitypub_federation::protocol::context::WithContext;
//! #[derive(serde::Serialize)]
//! struct Note {
//! content: String
//! }
//! let note = Note {
//! content: "Hello world".to_string()
//! };
//! let note_with_context = WithContext::new_default(note);
//! let serialized = serde_json::to_string(&note_with_context)?;
//! assert_eq!(serialized, r#"{"@context":["https://www.w3.org/ns/activitystreams"],"content":"Hello world"}"#);
//! Ok::<(), serde_json::error::Error>(())
//! ```
use crate::federation::{
config::Data, protocol::helper::deserialize_one_or_many, traits::ActivityHandler,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use url::Url;
/// Default context used in Activitypub
const DEFAULT_CONTEXT: &str = "https://www.w3.org/ns/activitystreams";
/// Wrapper for federated structs which handles `@context` field.
#[derive(Serialize, Deserialize, Debug)]
pub struct WithContext<T> {
#[serde(rename = "@context")]
#[serde(deserialize_with = "deserialize_one_or_many")]
context: Vec<Value>,
#[serde(flatten)]
inner: T,
}
impl<T> WithContext<T> {
/// Create a new wrapper with the default Activitypub context.
pub fn new_default(inner: T) -> WithContext<T> {
let context = vec![Value::String(DEFAULT_CONTEXT.to_string())];
WithContext::new(inner, context)
}
/// Create new wrapper with custom context. Use this in case you are implementing extensions.
pub fn new(inner: T, context: Vec<Value>) -> WithContext<T> {
WithContext { context, inner }
}
/// Returns the inner `T` object which this `WithContext` object is wrapping
pub fn inner(&self) -> &T {
&self.inner
}
}
#[async_trait::async_trait]
impl<T> ActivityHandler for WithContext<T>
where
T: ActivityHandler + Send + Sync,
{
type DataType = <T as ActivityHandler>::DataType;
type Error = <T as ActivityHandler>::Error;
fn id(&self) -> &Url {
self.inner.id()
}
fn actor(&self) -> &Url {
self.inner.actor()
}
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
self.inner.verify(data).await
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
self.inner.receive(data).await
}
}
impl<T> Clone for WithContext<T>
where
T: Clone,
{
fn clone(&self) -> Self {
Self {
context: self.context.clone(),
inner: self.inner.clone(),
}
}
}

View file

@ -0,0 +1,120 @@
// GNU Affero General Public License v3.0
// https://github.com/LemmyNet/activitypub-federation-rust
//! Serde deserialization functions which help to receive differently shaped data
use serde::{Deserialize, Deserializer};
/// Deserialize JSON single value or array into Vec.
///
/// Useful if your application can handle multiple values for a field, but another federated
/// platform only sends a single one.
///
/// ```
/// # use activitypub_federation::protocol::helpers::deserialize_one_or_many;
/// # use url::Url;
/// #[derive(serde::Deserialize)]
/// struct Note {
/// #[serde(deserialize_with = "deserialize_one_or_many")]
/// to: Vec<Url>
/// }
///
/// let single: Note = serde_json::from_str(r#"{"to": "https://example.com/u/alice" }"#)?;
/// assert_eq!(single.to.len(), 1);
///
/// let multiple: Note = serde_json::from_str(
/// r#"{"to": [
/// "https://example.com/u/alice",
/// "https://lemmy.ml/u/bob"
/// ]}"#)?;
/// assert_eq!(multiple.to.len(), 2);
/// Ok::<(), anyhow::Error>(())
pub fn deserialize_one_or_many<'de, T, D>(deserializer: D) -> Result<Vec<T>, D::Error>
where
T: Deserialize<'de>,
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum OneOrMany<T> {
One(T),
Many(Vec<T>),
}
let result: OneOrMany<T> = Deserialize::deserialize(deserializer)?;
Ok(match result {
OneOrMany::Many(list) => list,
OneOrMany::One(value) => vec![value],
})
}
/// Deserialize JSON single value or single element array into single value.
///
/// Useful if your application can only handle a single value for a field, but another federated
/// platform sends single value wrapped in array. Fails if array contains multiple items.
///
/// ```
/// # use activitypub_federation::protocol::helpers::deserialize_one;
/// # use url::Url;
/// #[derive(serde::Deserialize)]
/// struct Note {
/// #[serde(deserialize_with = "deserialize_one")]
/// to: Url
/// }
///
/// let note = serde_json::from_str::<Note>(r#"{"to": ["https://example.com/u/alice"] }"#);
/// assert!(note.is_ok());
pub fn deserialize_one<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: Deserialize<'de>,
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum MaybeArray<T> {
Simple(T),
Array([T; 1]),
}
let result: MaybeArray<T> = Deserialize::deserialize(deserializer)?;
Ok(match result {
MaybeArray::Simple(value) => value,
MaybeArray::Array([value]) => value,
})
}
/// Attempts to deserialize item, in case of error falls back to the type's default value.
///
/// Useful for optional fields which are sent with a different type from another platform,
/// eg object instead of array. Should always be used together with `#[serde(default)]`, so
/// that a mssing value doesn't cause an error.
///
/// ```
/// # use activitypub_federation::protocol::helpers::deserialize_skip_error;
/// # use url::Url;
/// #[derive(serde::Deserialize)]
/// struct Note {
/// content: String,
/// #[serde(deserialize_with = "deserialize_skip_error", default)]
/// source: Option<String>
/// }
///
/// let note = serde_json::from_str::<Note>(
/// r#"{
/// "content": "How are you?",
/// "source": {
/// "content": "How are you?",
/// "mediaType": "text/markdown"
/// }
/// }"#);
/// assert_eq!(note.unwrap().source, None);
/// # Ok::<(), anyhow::Error>(())
pub fn deserialize_skip_error<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: Deserialize<'de> + Default,
D: Deserializer<'de>,
{
let value = serde_json::Value::deserialize(deserializer)?;
let inner = T::deserialize(value).unwrap_or_default();
Ok(inner)
}

View file

@ -0,0 +1,82 @@
//! Types of Activity, Actor, Collection, Link, and Object
use parse_display::{Display, FromStr};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Display, FromStr, PartialEq, Serialize, Deserialize, Default)]
pub(crate) enum ActivityType {
Activity,
Accept,
Add,
Announce,
Arrive,
Block,
#[default]
Create,
Delete,
Dislike,
Flag,
Follow,
Ignore,
Invite,
Join,
Leave,
Like,
Listen,
Move,
Offer,
Question,
Read,
Reject,
Remove,
TentativeAccept,
TentativeReject,
Travel,
Undo,
Update,
View,
}
#[derive(Clone, Debug, Display, FromStr, PartialEq, Serialize, Deserialize, Default)]
pub(crate) enum ActorType {
Application,
Group,
Organization,
#[default]
Person,
Service,
}
#[derive(Clone, Debug, Display, FromStr, PartialEq, Serialize, Deserialize, Default)]
pub(crate) enum CollectionType {
Collection,
#[default]
OrderedCollection,
CollectionPage,
OrderedCollectionPage,
}
#[derive(Clone, Debug, Display, FromStr, PartialEq, Serialize, Deserialize, Default)]
pub(crate) enum LinkType {
#[default]
Link,
Mention,
}
#[derive(Clone, Debug, Display, FromStr, PartialEq, Serialize, Deserialize, Default)]
pub(crate) enum ObjectType {
Object,
Article,
Audio,
Document,
Event,
Image,
#[default]
Note,
Page,
Place,
Profile,
Relationship,
Tombstone,
Video,
}

View file

@ -0,0 +1,5 @@
pub mod context;
pub mod helper;
pub mod kind;
pub mod public_key;
pub mod verification;

View file

@ -0,0 +1,39 @@
// GNU Affero General Public License v3.0
// https://github.com/LemmyNet/activitypub-federation-rust
//! Struct which is used to federate actor key for HTTP signatures
use serde::{Deserialize, Serialize};
use url::Url;
/// Public key of actors which is used for HTTP signatures.
///
/// This needs to be federated in the `public_key` field of all actors.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PublicKey {
/// Id of this private key.
pub id: String,
/// ID of the actor that this public key belongs to
pub owner: Url,
/// The actual public key in PEM format
pub public_key_pem: String,
}
impl PublicKey {
/// Create a new [PublicKey] struct for the `owner` with `public_key_pem`.
///
/// It uses an standard key id of `{actor_id}#main-key`
pub fn new(owner: Url, public_key_pem: String) -> Self {
let id = main_key_id(&owner);
PublicKey {
id,
owner,
public_key_pem,
}
}
}
pub fn main_key_id(owner: &Url) -> String {
format!("{}#main-key", &owner)
}

View file

@ -0,0 +1,41 @@
// GNU Affero General Public License v3.0
// https://github.com/LemmyNet/activitypub-federation-rust
//! Verify that received data is valid
use crate::error::Error;
use url::Url;
/// Check that both urls have the same domain. If not, return UrlVerificationError.
///
/// ```
/// # use url::Url;
/// # use activitypub_federation::protocol::verification::verify_domains_match;
/// let a = Url::parse("https://example.com/abc")?;
/// let b = Url::parse("https://sample.net/abc")?;
/// assert!(verify_domains_match(&a, &b).is_err());
/// # Ok::<(), url::ParseError>(())
/// ```
pub fn verify_domains_match(a: &Url, b: &Url) -> Result<(), Error> {
if a.domain() != b.domain() {
return Err(Error::UrlVerificationError("Domains do not match"));
}
Ok(())
}
/// Check that both urls are identical. If not, return UrlVerificationError.
///
/// ```
/// # use url::Url;
/// # use activitypub_federation::protocol::verification::verify_urls_match;
/// let a = Url::parse("https://example.com/abc")?;
/// let b = Url::parse("https://example.com/123")?;
/// assert!(verify_urls_match(&a, &b).is_err());
/// # Ok::<(), url::ParseError>(())
/// ```
pub fn verify_urls_match(a: &Url, b: &Url) -> Result<(), Error> {
if a != b {
return Err(Error::UrlVerificationError("Urls do not match"));
}
Ok(())
}

View file

@ -0,0 +1,136 @@
use crate::error::Error;
use bytes::{BufMut, Bytes, BytesMut};
use futures_core::{ready, stream::BoxStream, Stream};
use pin_project_lite::pin_project;
use reqwest::Response;
use serde::de::DeserializeOwned;
use std::{
future::Future,
marker::PhantomData,
mem,
pin::Pin,
task::{Context, Poll},
};
/// 100KB
const MAX_BODY_SIZE: usize = 102400;
pin_project! {
pub struct BytesFuture {
#[pin]
stream: BoxStream<'static, reqwest::Result<Bytes>>,
limit: usize,
aggregator: BytesMut,
}
}
impl Future for BytesFuture {
type Output = Result<Bytes, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let this = self.as_mut().project();
if let Some(chunk) = ready!(this.stream.poll_next(cx))
.transpose()
.map_err(Error::other)?
{
this.aggregator.put(chunk);
if this.aggregator.len() > *this.limit {
return Poll::Ready(Err(Error::ResponseBodyLimit));
}
continue;
}
break;
}
Poll::Ready(Ok(mem::take(&mut self.aggregator).freeze()))
}
}
pin_project! {
pub struct JsonFuture<T> {
_t: PhantomData<T>,
#[pin]
future: BytesFuture,
}
}
impl<T> Future for JsonFuture<T>
where
T: DeserializeOwned,
{
type Output = Result<T, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let bytes = ready!(this.future.poll(cx))?;
Poll::Ready(serde_json::from_slice(&bytes).map_err(Error::other))
}
}
pin_project! {
pub struct TextFuture {
#[pin]
future: BytesFuture,
}
}
impl Future for TextFuture {
type Output = Result<String, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let bytes = ready!(this.future.poll(cx))?;
Poll::Ready(String::from_utf8(bytes.to_vec()).map_err(Error::other))
}
}
/// Response shim to work around [an issue in reqwest](https://github.com/seanmonstar/reqwest/issues/1234) (there is an [open pull request](https://github.com/seanmonstar/reqwest/pull/1532) fixing this).
///
/// Reqwest doesn't limit the response body size by default nor does it offer an option to configure one.
/// Since we have to fetch data from untrusted sources, not restricting the maximum size is a DoS hazard for us.
///
/// This shim reimplements the `bytes`, `json`, and `text` functions and restricts the bodies to 100KB.
///
/// TODO: Remove this shim as soon as reqwest gets support for size-limited bodies.
pub trait ResponseExt {
type BytesFuture;
type JsonFuture<T>;
type TextFuture;
/// Size limited version of `bytes` to work around a reqwest issue. Check [`ResponseExt`] docs for details.
fn bytes_limited(self) -> Self::BytesFuture;
/// Size limited version of `json` to work around a reqwest issue. Check [`ResponseExt`] docs for details.
fn json_limited<T>(self) -> Self::JsonFuture<T>;
/// Size limited version of `text` to work around a reqwest issue. Check [`ResponseExt`] docs for details.
fn text_limited(self) -> Self::TextFuture;
}
impl ResponseExt for Response {
type BytesFuture = BytesFuture;
type JsonFuture<T> = JsonFuture<T>;
type TextFuture = TextFuture;
fn bytes_limited(self) -> Self::BytesFuture {
BytesFuture {
stream: Box::pin(self.bytes_stream()),
limit: MAX_BODY_SIZE,
aggregator: BytesMut::new(),
}
}
fn json_limited<T>(self) -> Self::JsonFuture<T> {
JsonFuture {
_t: PhantomData,
future: self.bytes_limited(),
}
}
fn text_limited(self) -> Self::TextFuture {
TextFuture {
future: self.bytes_limited(),
}
}
}

View file

@ -0,0 +1,552 @@
// GNU Affero General Public License v3.0
// https://github.com/LemmyNet/activitypub-federation-rust
//! Traits which need to be implemented for federated data types
use crate::federation::{config::Data, protocol::public_key::PublicKey};
use async_trait::async_trait;
use chrono::NaiveDateTime;
use serde::Deserialize;
use std::ops::Deref;
use url::Url;
/// Helper for converting between database structs and federated protocol structs.
///
/// ```
/// # use activitystreams_kinds::{object::NoteType, public};
/// # use chrono::{Local, NaiveDateTime};
/// # use serde::{Deserialize, Serialize};
/// # use url::Url;
/// # use activitypub_federation::protocol::{public_key::PublicKey, helpers::deserialize_one_or_many};
/// # use activitypub_federation::config::Data;
/// # use activitypub_federation::fetch::object_id::ObjectId;
/// # use activitypub_federation::protocol::verification::verify_domains_match;
/// # use activitypub_federation::traits::{Actor, Object};
/// # use activitypub_federation::traits::tests::{DbConnection, DbUser};
/// #
/// /// How the post is read/written in the local database
/// pub struct DbPost {
/// pub text: String,
/// pub ap_id: ObjectId<DbPost>,
/// pub creator: ObjectId<DbUser>,
/// pub local: bool,
/// }
///
/// /// How the post is serialized and represented as Activitypub JSON
/// #[derive(Deserialize, Serialize, Debug)]
/// #[serde(rename_all = "camelCase")]
/// pub struct Note {
/// #[serde(rename = "type")]
/// kind: NoteType,
/// id: ObjectId<DbPost>,
/// pub(crate) attributed_to: ObjectId<DbUser>,
/// #[serde(deserialize_with = "deserialize_one_or_many")]
/// pub(crate) to: Vec<Url>,
/// content: String,
/// }
///
/// #[async_trait::async_trait]
/// impl Object for DbPost {
/// type DataType = DbConnection;
/// type Kind = Note;
/// type Error = anyhow::Error;
///
/// async fn read_from_id(object_id: Url, data: &Data<Self::DataType>) -> Result<Option<Self>, Self::Error> {
/// // Attempt to read object from local database. Return Ok(None) if not found.
/// let post: Option<DbPost> = data.read_post_from_json_id(object_id).await?;
/// Ok(post)
/// }
///
/// async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
/// // Called when a local object gets sent out over Activitypub. Simply convert it to the
/// // protocol struct
/// Ok(Note {
/// kind: Default::default(),
/// id: self.ap_id.clone().into(),
/// attributed_to: self.creator,
/// to: vec![public()],
/// content: self.text,
/// })
/// }
///
/// async fn verify(json: &Self::Kind, expected_domain: &Url, data: &Data<Self::DataType>,) -> Result<(), Self::Error> {
/// verify_domains_match(json.id.inner(), expected_domain)?;
/// // additional application specific checks
/// Ok(())
/// }
///
/// async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
/// // Called when a remote object gets received over Activitypub. Validate and insert it
/// // into the database.
///
/// let post = DbPost {
/// text: json.content,
/// ap_id: json.id,
/// creator: json.attributed_to,
/// local: false,
/// };
///
/// // Here we need to persist the object in the local database. Note that Activitypub
/// // doesnt distinguish between creating and updating an object. Thats why we need to
/// // use upsert functionality.
/// data.upsert(&post).await?;
///
/// Ok(post)
/// }
///
/// }
#[async_trait]
pub trait Object: Sized {
/// App data type passed to handlers. Must be identical to
/// [crate::config::FederationConfigBuilder::app_data] type.
type DataType: Clone + Send + Sync;
/// The type of protocol struct which gets sent over network to federate this database struct.
type Kind;
/// Error type returned by handler methods
type Error;
/// Returns the last time this object was updated.
///
/// If this returns `Some` and the value is too long ago, the object is refetched from the
/// original instance. This should always be implemented for actors, because there is no active
/// update mechanism prescribed. It is possible to send `Update/Person` activities for profile
/// changes, but not all implementations do this, so `last_refreshed_at` is still necessary.
///
/// The object is refetched if `last_refreshed_at` value is more than 24 hours ago. In debug
/// mode this is reduced to 20 seconds.
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
None
}
/// Try to read the object with given `id` from local database.
///
/// Should return `Ok(None)` if not found.
async fn read_from_id(
object_id: Url,
data: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error>;
/// Mark remote object as deleted in local database.
///
/// Called when a `Delete` activity is received, or if fetch returns a `Tombstone` object.
async fn delete(self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
Ok(())
}
/// Convert database type to Activitypub type.
///
/// Called when a local object gets fetched by another instance over HTTP, or when an object
/// gets sent in an activity.
async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error>;
/// Verifies that the received object is valid.
///
/// You should check here that the domain of id matches `expected_domain`. Additionally you
/// should perform any application specific checks.
///
/// It is necessary to use a separate method for this, because it might be used for activities
/// like `Delete/Note`, which shouldn't perform any database write for the inner `Note`.
async fn verify(
json: &Self::Kind,
expected_domain: &Url,
data: &Data<Self::DataType>,
) -> Result<(), Self::Error>;
/// Convert object from ActivityPub type to database type.
///
/// Called when an object is received from HTTP fetch or as part of an activity. This method
/// should write the received object to database. Note that there is no distinction between
/// create and update, so an `upsert` operation should be used.
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error>;
}
/// Handler for receiving incoming activities.
///
/// ```rust
/// # use activitystreams_kinds::activity::FollowType;
/// # use url::Url;
/// # use activitypub_federation::fetch::object_id::ObjectId;
/// # use activitypub_federation::config::Data;
/// # use activitypub_federation::traits::ActivityHandler;
/// # use activitypub_federation::traits::tests::{DbConnection, DbUser, DB_LOCAL_USER};
/// #[derive(serde::Deserialize)]
/// struct Follow {
/// actor: ObjectId<DbUser>,
/// object: ObjectId<DbUser>,
/// #[serde(rename = "type")]
/// kind: FollowType,
/// id: Url,
/// }
///
/// #[async_trait::async_trait]
/// impl ActivityHandler for Follow {
/// type DataType = DbConnection;
/// type Error = anyhow::Error;
///
/// fn id(&self) -> &Url {
/// &self.id
/// }
///
/// fn actor(&self) -> &Url {
/// self.actor.inner()
/// }
///
/// async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
/// Ok(())
/// }
///
/// async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
/// let local_user = self.object.dereference(&DB_LOCAL_USER.clone(), data).await?;
/// let follower = self.actor.dereference(&DB_LOCAL_USER.clone(), data).await?;
/// data.add_follower(local_user, follower).await?;
/// Ok(())
/// }
/// }
/// ```
#[async_trait]
#[enum_delegate::register]
pub trait ActivityHandler {
/// App data type passed to handlers. Must be identical to
/// [crate::config::FederationConfigBuilder::app_data] type.
type DataType: Clone + Send + Sync;
/// Error type returned by handler methods
type Error;
/// `id` field of the activity
fn id(&self) -> &Url;
/// `actor` field of activity
fn actor(&self) -> &Url;
/// Verifies that the received activity is valid.
///
/// This needs to be a separate method, because it might be used for activities
/// like `Undo/Follow`, which shouldn't perform any database write for the inner `Follow`.
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error>;
/// Called when an activity is received.
///
/// Should perform validation and possibly write action to the database. In case the activity
/// has a nested `object` field, must call `object.from_json` handler.
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error>;
}
/// Trait to allow retrieving common Actor data.
pub trait Actor: Object + Send + 'static {
/// `id` field of the actor
fn id(&self) -> Url;
/// The actor's public key for verifying signatures of incoming activities.
///
/// Use [generate_actor_keypair](crate::http_signatures::generate_actor_keypair) to create the
/// actor keypair.
fn public_key_pem(&self) -> &str;
/// The inbox where activities for this user should be sent to
fn inbox(&self) -> Url;
/// Generates a public key struct for use in the actor json representation
fn public_key(&self) -> PublicKey {
PublicKey::new(self.id(), self.public_key_pem().to_string())
}
/// The actor's shared inbox, if any
fn shared_inbox(&self) -> Option<Url> {
None
}
/// Returns shared inbox if it exists, normal inbox otherwise.
fn shared_inbox_or_inbox(&self) -> Url {
self.shared_inbox().unwrap_or_else(|| self.inbox())
}
}
/// Trait to represent a local actor.
pub trait LocalActor {
/// Federation ID (URL) of the actor
fn federation_id(&self) -> Url;
/// The local actor's private key for signing outgoing activities and requests.
///
/// Use [generate_actor_keypair](crate::http_signatures::generate_actor_keypair) to create the
/// actor keypair.
fn private_key_pem(&self) -> &str;
}
/// Allow for boxing of enum variants
#[async_trait]
impl<T> ActivityHandler for Box<T>
where
T: ActivityHandler + Send + Sync,
{
type DataType = T::DataType;
type Error = T::Error;
fn id(&self) -> &Url {
self.deref().id()
}
fn actor(&self) -> &Url {
self.deref().actor()
}
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
self.deref().verify(data).await
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
(*self).receive(data).await
}
}
/// Trait for federating collections
#[async_trait]
pub trait Collection: Sized {
/// Actor or object that this collection belongs to
type Owner;
/// App data type passed to handlers. Must be identical to
/// [crate::config::FederationConfigBuilder::app_data] type.
type DataType: Clone + Send + Sync;
/// The type of protocol struct which gets sent over network to federate this database struct.
type Kind: for<'de2> Deserialize<'de2>;
/// Error type returned by handler methods
type Error;
/// Reads local collection from database and returns it as Activitypub JSON.
async fn read_local(
owner: &Self::Owner,
data: &Data<Self::DataType>,
) -> Result<Self::Kind, Self::Error>;
/// Verifies that the received object is valid.
///
/// You should check here that the domain of id matches `expected_domain`. Additionally you
/// should perform any application specific checks.
async fn verify(
json: &Self::Kind,
expected_domain: &Url,
data: &Data<Self::DataType>,
) -> Result<(), Self::Error>;
/// Convert object from ActivityPub type to database type.
///
/// Called when an object is received from HTTP fetch or as part of an activity. This method
/// should also write the received object to database. Note that there is no distinction
/// between create and update, so an `upsert` operation should be used.
async fn from_json(
json: Self::Kind,
owner: &Self::Owner,
data: &Data<Self::DataType>,
) -> Result<Self, Self::Error>;
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::federation::{
fetch::object_id::ObjectId,
http_signature::{generate_actor_keypair, Keypair},
protocol::{kind, public_key::PublicKey, verification::verify_domains_match},
};
use anyhow::Error;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
#[derive(Clone)]
pub struct DbConnection;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct PersonActor {
#[serde(rename = "type")]
pub kind: kind::ActorType,
pub preferred_username: String,
pub id: ObjectId<DbUser>,
pub inbox: Url,
pub public_key: PublicKey,
}
#[derive(Debug, Clone)]
pub(crate) struct DbUser {
pub name: String,
pub federation_id: Url,
pub inbox: Url,
pub public_key: String,
#[allow(dead_code)]
private_key: Option<String>,
pub followers: Vec<Url>,
pub local: bool,
}
#[derive(Debug, Clone)]
pub struct DbLocalUser(DbUser);
pub static DB_USER_KEYPAIR: Lazy<Keypair> = Lazy::new(|| generate_actor_keypair().unwrap());
pub static DB_LOCAL_USER_KEYPAIR: Lazy<Keypair> =
Lazy::new(|| generate_actor_keypair().unwrap());
pub(crate) static DB_USER: Lazy<DbUser> = Lazy::new(|| DbUser {
name: String::new(),
federation_id: "https://example.com/123".parse().unwrap(),
inbox: "https://example.com/123/inbox".parse().unwrap(),
public_key: DB_USER_KEYPAIR.public_key.clone(),
private_key: None,
followers: vec![],
local: false,
});
pub static DB_LOCAL_USER: Lazy<DbLocalUser> = Lazy::new(|| {
DbLocalUser(DbUser {
name: String::new(),
federation_id: "https://localhost/456".parse().unwrap(),
inbox: "https://localhost/456/inbox".parse().unwrap(),
public_key: DB_LOCAL_USER_KEYPAIR.public_key.clone(),
private_key: Some(DB_LOCAL_USER_KEYPAIR.private_key.clone()),
followers: vec![],
local: true,
})
});
#[async_trait]
impl Object for DbUser {
type DataType = DbConnection;
type Kind = PersonActor;
type Error = Error;
async fn read_from_id(
_object_id: Url,
_data: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error> {
Ok(Some(DB_USER.clone()))
}
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
Ok(PersonActor {
preferred_username: self.name.clone(),
kind: Default::default(),
id: self.federation_id.clone().into(),
inbox: self.inbox.clone(),
public_key: self.public_key(),
})
}
async fn verify(
json: &Self::Kind,
expected_domain: &Url,
_data: &Data<Self::DataType>,
) -> Result<(), Self::Error> {
verify_domains_match(json.id.inner(), expected_domain)?;
Ok(())
}
async fn from_json(
json: Self::Kind,
_data: &Data<Self::DataType>,
) -> Result<Self, Self::Error> {
Ok(DbUser {
name: json.preferred_username,
federation_id: json.id.into(),
inbox: json.inbox,
public_key: json.public_key.public_key_pem,
private_key: None,
followers: vec![],
local: false,
})
}
}
impl Actor for DbUser {
fn id(&self) -> Url {
self.federation_id.clone()
}
fn public_key_pem(&self) -> &str {
&self.public_key
}
fn inbox(&self) -> Url {
self.inbox.clone()
}
}
impl LocalActor for DbLocalUser {
fn federation_id(&self) -> Url {
self.0.federation_id.clone()
}
fn private_key_pem(&self) -> &str {
&self
.0
.private_key
.as_ref()
.expect("Local user must have a private key")
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub(crate) struct Activity {
pub actor: ObjectId<DbUser>,
pub object: ObjectId<DbUser>,
#[serde(rename = "type")]
pub kind: kind::ActivityType,
pub id: Url,
}
#[async_trait]
impl ActivityHandler for Activity {
type DataType = DbConnection;
type Error = Error;
fn id(&self) -> &Url {
&self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(&self, _: &Data<Self::DataType>) -> Result<(), Self::Error> {
Ok(())
}
async fn receive(self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
Ok(())
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Note {}
#[derive(Debug, Clone)]
pub struct DbPost {}
#[async_trait]
impl Object for DbPost {
type DataType = DbConnection;
type Kind = Note;
type Error = Error;
async fn read_from_id(
_: Url,
_: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error> {
todo!()
}
async fn into_json(self, _: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
todo!()
}
async fn verify(
_: &Self::Kind,
_: &Url,
_: &Data<Self::DataType>,
) -> Result<(), Self::Error> {
todo!()
}
async fn from_json(_: Self::Kind, _: &Data<Self::DataType>) -> Result<Self, Self::Error> {
todo!()
}
}
}

View file

@ -0,0 +1,3 @@
pub mod error;
pub mod federation;
pub mod queue;

View file

@ -0,0 +1,183 @@
//! Used to queue sending activity
use crate::{
error::Error,
federation::{
config::Data,
http_signature::sign_request,
reqwest_shim::ResponseExt,
traits::{ActivityHandler, LocalActor},
FEDERATION_CONTENT_TYPE,
},
};
use anyhow::anyhow;
use async_trait::async_trait;
use dyn_clone::{clone_trait_object, DynClone};
use http::{header::HeaderName, HeaderMap, HeaderValue};
use httpdate::fmt_http_date;
use itertools::Itertools;
use reqwest_middleware::ClientWithMiddleware;
use serde::{Deserialize, Serialize};
use std::{
fmt::Debug,
time::{Duration, SystemTime},
};
use tracing::{debug, info, warn};
use url::Url;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SendActivityTask {
pub actor_id: Url,
pub activity_id: Url,
pub activity: String,
pub inbox: Url,
pub private_key: String,
pub http_signature_compat: bool,
}
#[async_trait]
pub trait QueueManager: DynClone + Send {
/// Called in [crate::queue::send_activity], and would call
/// [crate::queue::do_send] inside to send activity to remote servers.
async fn queue_deliver(&self, task: SendActivityTask) -> Result<(), Error>;
}
clone_trait_object!(QueueManager);
/// Send a new activity to the given inboxes
///
/// - `activity`: The activity to be sent, gets converted to json
/// - `private_key`: Private key belonging to the actor who sends the activity, for signing HTTP
/// signature. Generated with [crate::http_signatures::generate_actor_keypair].
/// - `inboxes`: List of actor inboxes that should receive the activity. Should be built by calling
/// [crate::federation::traits::Actor::shared_inbox_or_inbox] for each target actor.
pub async fn send_activity<Activity, Datatype, ActorType>(
activity: Activity,
actor: &ActorType,
inboxes: Vec<Url>,
data: &Data<Datatype>,
) -> Result<(), Error>
where
Activity: ActivityHandler + Serialize,
Datatype: Clone,
ActorType: LocalActor,
{
let config = &data.config;
let actor_id = activity.actor();
let activity_id = activity.id();
let activity_serialized = serde_json::to_string_pretty(&activity)?;
let private_key = actor.private_key_pem();
let inboxes: Vec<Url> = inboxes
.into_iter()
.unique()
.filter(|i| !config.is_local_url(i))
.collect();
for inbox in inboxes {
if config.verify_url_valid(&inbox).await.is_err() {
continue;
}
let message = SendActivityTask {
actor_id: actor_id.clone(),
activity_id: activity_id.clone(),
inbox,
activity: activity_serialized.clone(),
private_key: private_key.to_string(),
http_signature_compat: config.http_signature_compat,
};
if config.debug {
let res = do_send(message, &config.client, config.request_timeout).await;
// Don't fail on error, as we intentionally do some invalid actions in tests, to verify that
// they are rejected on the receiving side. These errors shouldn't bubble up to make the API
// call fail. This matches the behaviour in production.
if let Err(e) = res {
warn!("{}", e);
}
} else {
debug!(task = ?message, "Queue sending activity");
data.config.queue_manager.queue_deliver(message).await?;
}
}
Ok(())
}
async fn do_send(
task: SendActivityTask,
client: &ClientWithMiddleware,
timeout: Duration,
) -> Result<(), anyhow::Error> {
debug!("Sending {} to {}", task.activity_id, task.inbox);
let request_builder = client
.post(task.inbox.to_string())
.timeout(timeout)
.headers(generate_request_headers(&task.inbox));
let request = sign_request(
request_builder,
task.actor_id,
task.activity,
task.private_key,
task.http_signature_compat,
)
.await?;
let response = client.execute(request).await;
match response {
Ok(o) if o.status().is_success() => {
info!(
"Activity {} delivered successfully to {}",
task.activity_id, task.inbox
);
Ok(())
}
Ok(o) if o.status().is_client_error() => {
let text = o.text_limited().await.map_err(Error::other)?;
info!(
"Activity {} was rejected by {}, aborting: {}",
task.activity_id, task.inbox, text,
);
Ok(())
}
Ok(o) => {
let status = o.status();
let text = o.text_limited().await.map_err(Error::other)?;
Err(anyhow!(
"Queueing activity {} to {} for retry after failure with status {}: {}",
task.activity_id,
task.inbox,
status,
text,
))
}
Err(e) => {
info!(
"Unable to connect to {}, aborting task {}: {}",
task.inbox, task.activity_id, e
);
Ok(())
}
}
}
pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
let mut host = inbox_url.domain().expect("read inbox domain").to_string();
if let Some(port) = inbox_url.port() {
host = format!("{}:{}", host, port);
}
let mut headers = HeaderMap::new();
headers.insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static(FEDERATION_CONTENT_TYPE),
);
headers.insert(
HeaderName::from_static("host"),
HeaderValue::from_str(&host).expect("Hostname is valid"),
);
headers.insert(
"date",
HeaderValue::from_str(&fmt_http_date(SystemTime::now())).expect("Date is valid"),
);
headers
}