This commit is contained in:
Namekuji 2023-09-09 01:28:07 -04:00
parent 758280a828
commit 7d002ec440
No known key found for this signature in database
GPG key ID: 1D62332C07FBA532
7 changed files with 227 additions and 50 deletions

View file

@ -667,6 +667,18 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "dialoguer"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59c6f2989294b9a498d3ad5491a79c6deb604617378e1cdc4bfc1c1361fe2f87"
dependencies = [
"console",
"shell-words",
"tempfile",
"zeroize",
]
[[package]]
name = "diff"
version = "0.1.13"
@ -2445,6 +2457,7 @@ version = "0.1.0"
dependencies = [
"chrono",
"clap",
"dialoguer",
"futures",
"indicatif",
"scylla",
@ -2764,6 +2777,12 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "shell-words"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde"
[[package]]
name = "signal-hook-registry"
version = "1.4.1"

View file

@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
chrono = "0.4.26"
clap = { version = "4.3.11", features = ["derive"] }
dialoguer = "0.10.4"
futures = "0.3.28"
indicatif = { version = "0.17.6", features = ["tokio"] }
scylla = "0.8.2"

View file

@ -1,30 +0,0 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "emoji")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: String,
#[sea_orm(column_name = "updatedAt")]
pub updated_at: Option<DateTimeWithTimeZone>,
pub name: String,
pub host: Option<String>,
#[sea_orm(column_name = "originalUrl")]
pub original_url: String,
pub uri: Option<String>,
pub r#type: Option<String>,
pub aliases: Vec<String>,
pub category: Option<String>,
#[sea_orm(column_name = "publicUrl")]
pub public_url: String,
pub license: Option<String>,
pub width: Option<i32>,
pub height: Option<i32>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -1,9 +1,9 @@
pub mod drive_file;
pub mod emoji;
pub mod following;
pub mod note;
pub mod note_edit;
pub mod note_reaction;
pub mod notification;
pub mod poll;
pub mod poll_vote;
pub mod user;

View file

@ -0,0 +1,41 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
use super::sea_orm_active_enums::NotificationTypeEnum;
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "notification")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: String,
#[sea_orm(column_name = "createdAt")]
pub created_at: DateTimeWithTimeZone,
#[sea_orm(column_name = "notifieeId")]
pub notifiee_id: String,
#[sea_orm(column_name = "notifierId")]
pub notifier_id: Option<String>,
#[sea_orm(column_name = "isRead")]
pub is_read: bool,
#[sea_orm(column_name = "noteId")]
pub note_id: Option<String>,
pub reaction: Option<String>,
pub choice: Option<i32>,
#[sea_orm(column_name = "followRequestId")]
pub follow_request_id: Option<String>,
pub r#type: NotificationTypeEnum,
#[sea_orm(column_name = "userGroupInvitationId")]
pub user_group_invitation_id: Option<String>,
#[sea_orm(column_name = "customBody")]
pub custom_body: Option<String>,
#[sea_orm(column_name = "customHeader")]
pub custom_header: Option<String>,
#[sea_orm(column_name = "customIcon")]
pub custom_icon: Option<String>,
#[sea_orm(column_name = "appAccessTokenId")]
pub app_access_token_id: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -11,7 +11,7 @@ use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("Session error: {0}")]
#[error("ScyllaDB Session error: {0}")]
Session(#[from] NewSessionError),
#[error("Query error: {0}")]
Query(#[from] QueryError),

View file

@ -1,18 +1,21 @@
use std::{collections::HashMap, fmt::Write, sync::OnceLock};
use std::collections::HashMap;
use chrono::{DateTime, NaiveDate, Utc};
use dialoguer::{theme::ColorfulTheme, Confirm};
use futures::{future, TryStreamExt};
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use indicatif::{ProgressBar, ProgressStyle};
use scylla::{
prepared_statement::PreparedStatement, FromUserType, IntoUserType, Session, SessionBuilder,
ValueList,
prepared_statement::PreparedStatement, FromRow, FromUserType, IntoUserType, Session,
SessionBuilder, ValueList,
};
use sea_orm::{entity::*, query::*, Database, DatabaseConnection};
use urlencoding::encode;
use crate::{
config::{DbConfig, ScyllaConfig},
entity::{drive_file, emoji, following, note, note_edit, note_reaction, poll, poll_vote, user},
entity::{
drive_file, following, note, note_edit, note_reaction, notification, poll, poll_vote, user,
},
error::Error,
};
@ -47,11 +50,33 @@ impl Initializer {
}
pub(crate) async fn setup(&self) -> Result<(), Error> {
println!("Several tables in PostgreSQL are going to moved to ScyllaDB.");
let pool = Database::connect(&self.postgres_url).await?;
let db_backend = pool.get_database_backend();
println!(
"{}",
dialoguer::console::style(
"This is irreversible! Please backup your PostgreSQL database before you proceed."
)
.bold()
);
let confirm = Confirm::with_theme(&ColorfulTheme::default())
.with_prompt("This process may take a while. Do you want to continue?")
.interact()
.unwrap_or(false);
if !confirm {
println!("Cancelled.");
return Ok(());
}
println!("Copyping data from PostgreSQL to ScyllaDB.");
self.copy(&pool).await?;
println!("Dropping constraints from PostgreSQL.");
let fk_pairs = vec![
("channel_note_pining", "FK_10b19ef67d297ea9de325cd4502"),
("clip_note", "FK_a012eaf5c87c65da1deb5fdbfa3"),
@ -71,6 +96,7 @@ impl Initializer {
.await?;
}
println!("Dropping tables from PostgreSQL.");
let tables = vec![
"note_reaction",
"note_edit",
@ -94,6 +120,9 @@ impl Initializer {
let note_prepared = self.scylla.prepare(INSERT_NOTE).await?;
let home_prepared = self.scylla.prepare(INSERT_HOME_TIMELINE).await?;
let reaction_prepared = self.scylla.prepare(INSERT_REACTION).await?;
let vote_insert_prepared = self.scylla.prepare(INSERT_POLL_VOTE).await?;
let vote_select_prepared = self.scylla.prepare(SELECT_POLL_VOTE).await?;
let notification_prepared = self.scylla.prepare(INSERT_NOTIFICATION).await?;
let num_notes: i64 = note::Entity::find()
.select_only()
@ -102,7 +131,6 @@ impl Initializer {
.one(db)
.await?
.unwrap_or_default();
let num_reactions: i64 = note_reaction::Entity::find()
.select_only()
.column_as(note_reaction::Column::Id.count(), "count")
@ -110,37 +138,83 @@ impl Initializer {
.one(db)
.await?
.unwrap_or_default();
println!("Copying notes from PostgreSQL to ScyllaDB.");
let num_votes: i64 = poll_vote::Entity::find()
.select_only()
.column_as(poll_vote::Column::Id.count(), "count")
.into_tuple()
.one(db)
.await?
.unwrap_or_default();
let num_notifications: i64 = notification::Entity::find()
.select_only()
.column_as(notification::Column::Id.count(), "count")
.into_tuple()
.one(db)
.await?
.unwrap_or_default();
const PB_TMPL: &str =
"{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})";
let pb_style = ProgressStyle::with_template(PB_TMPL)
.unwrap()
.progress_chars("#>-");
let note_pb = ProgressBar::new(num_notes as u64).with_style(pb_style.to_owned());
let reaction_pb = ProgressBar::new(num_reactions as u64).with_style(pb_style.to_owned());
let vote_pb = ProgressBar::new(num_votes as u64).with_style(pb_style.to_owned());
let notification_pb =
ProgressBar::new(num_notifications as u64).with_style(pb_style.to_owned());
let mut note_tasks = Vec::new();
let mut notes = note::Entity::find()
.order_by_asc(note::Column::Id)
.stream(db)
.await?;
let mut note_tasks = Vec::new();
while let Some(note) = notes.try_next().await? {
note_tasks.push(self.copy_note(note, db, &note_prepared, &home_prepared, &note_pb));
}
let mut reaction_tasks = Vec::new();
let mut reactions = note_reaction::Entity::find()
.order_by_asc(note_reaction::Column::Id)
.stream(db)
.await?;
let mut reaction_tasks = Vec::new();
while let Some(reaction) = reactions.try_next().await? {
reaction_tasks.push(self.copy_reaction(reaction, &reaction_prepared, &reaction_pb));
}
let mut vote_tasks = Vec::new();
let mut votes = poll_vote::Entity::find()
.order_by_asc(poll_vote::Column::Id)
.stream(db)
.await?;
while let Some(vote) = votes.try_next().await? {
vote_tasks.push(self.copy_vote(
vote,
db,
&vote_select_prepared,
&vote_insert_prepared,
&vote_pb,
));
}
let mut notification_tasks = Vec::new();
let mut notifications = notification::Entity::find()
.order_by_asc(notification::Column::Id)
.stream(db)
.await?;
while let Some(n) = notifications.try_next().await? {
notification_tasks.push(self.copy_notification(
n,
db,
&notification_prepared,
&notification_pb,
));
}
future::try_join_all(note_tasks).await?;
future::try_join_all(reaction_tasks).await?;
future::try_join_all(vote_tasks).await?;
future::try_join_all(notification_tasks).await?;
Ok(())
}
@ -149,8 +223,8 @@ impl Initializer {
&self,
note: note::Model,
db: &DatabaseConnection,
prepared_note: &PreparedStatement,
prepared_home: &PreparedStatement,
insert_note: &PreparedStatement,
insert_home: &PreparedStatement,
pb: &ProgressBar,
) -> Result<(), Error> {
let reply = match &note.reply_id {
@ -249,7 +323,7 @@ impl Initializer {
};
self.scylla
.execute(prepared_note, scylla_note.to_owned())
.execute(insert_note, scylla_note.to_owned())
.await?;
let mut home_tasks = Vec::new();
@ -307,7 +381,7 @@ impl Initializer {
note_edit: s_note.note_edit,
updated_at: s_note.updated_at,
};
home_tasks.push(self.scylla.execute(prepared_home, home));
home_tasks.push(self.scylla.execute(insert_home, home));
}
future::try_join_all(home_tasks).await?;
@ -318,7 +392,7 @@ impl Initializer {
async fn copy_reaction(
&self,
reaction: note_reaction::Model,
prepared: &PreparedStatement,
insert: &PreparedStatement,
pb: &ProgressBar,
) -> Result<(), Error> {
let scylla_reaction = ReactionTable {
@ -328,7 +402,77 @@ impl Initializer {
reaction: reaction.reaction,
created_at: reaction.created_at.into(),
};
self.scylla.execute(prepared, scylla_reaction).await?;
self.scylla.execute(insert, scylla_reaction).await?;
pb.inc(1);
Ok(())
}
async fn copy_vote(
&self,
vote: poll_vote::Model,
db: &DatabaseConnection,
select: &PreparedStatement,
insert: &PreparedStatement,
pb: &ProgressBar,
) -> Result<(), Error> {
let voted_user = user::Entity::find_by_id(&vote.user_id).one(db).await?;
if let Some(u) = voted_user {
let mut scylla_vote = PollVoteTable {
note_id: vote.note_id.to_owned(),
user_id: vote.user_id.to_owned(),
user_host: u.host,
choice: vec![vote.choice],
created_at: vote.created_at.into(),
};
if let Ok(row) = self
.scylla
.execute(select, (vote.note_id, vote.user_id))
.await?
.first_row()
{
let mut s_vote: PollVoteTable = row.into_typed()?;
scylla_vote.choice.append(&mut s_vote.choice);
scylla_vote.choice.dedup();
}
self.scylla.execute(insert, scylla_vote).await?;
}
pb.inc(1);
Ok(())
}
async fn copy_notification(
&self,
model: notification::Model,
db: &DatabaseConnection,
insert: &PreparedStatement,
pb: &ProgressBar,
) -> Result<(), Error> {
let notifier = match model.notifier_id.to_owned() {
None => None,
Some(id) => user::Entity::find_by_id(&id).one(db).await?,
};
let s_notification = NotificationTable {
target_id: model.notifiee_id,
created_at_date: model.created_at.date_naive(),
created_at: model.created_at.into(),
id: model.id,
notifier_id: model.notifier_id,
notifier_host: notifier.map(|n| n.host).flatten(),
r#type: model.r#type.to_value(),
entity_id: model
.note_id
.or(model.follow_request_id)
.or(model.user_group_invitation_id)
.or(model.app_access_token_id),
reatcion: model.reaction,
choice: model.choice,
custom_body: model.custom_body,
custom_header: model.custom_header,
custom_icon: model.custom_icon,
};
self.scylla.execute(insert, s_notification).await?;
pb.inc(1);
Ok(())
@ -617,7 +761,7 @@ struct ReactionTable {
const INSERT_REACTION: &str = r#"INSERT INTO reaction ("id", "noteId", "userId", "reaction", "createdAt") VALUES (?, ?, ?, ?, ?)"#;
#[derive(ValueList)]
#[derive(ValueList, FromRow)]
struct PollVoteTable {
note_id: String,
user_id: String,
@ -627,6 +771,7 @@ struct PollVoteTable {
}
const INSERT_POLL_VOTE: &str = r#"INSERT INTO poll_vote ("noteId", "userId", "userHost", "choice", "createdAt") VALUES (?, ?, ?, ?, ?)"#;
const SELECT_POLL_VOTE: &str = r#"SELECT "noteId", "userId", "userHost", "choice", "createdAt" FROM poll_vote WHERE "noteId" = ? AND "userId" = ?"#;
#[derive(ValueList)]
struct NotificationTable {
@ -641,6 +786,7 @@ struct NotificationTable {
reatcion: Option<String>,
choice: Option<i32>,
custom_body: Option<String>,
custom_header: Option<String>,
custom_icon: Option<String>,
}