From 7d002ec440c60e01423e5506b7f3bf5042c52563 Mon Sep 17 00:00:00 2001 From: Namekuji Date: Sat, 9 Sep 2023 01:28:07 -0400 Subject: [PATCH] add copy --- packages/backend/native-utils/Cargo.lock | 19 ++ .../native-utils/scylla-migration/Cargo.toml | 1 + .../scylla-migration/src/entity/emoji.rs | 30 --- .../scylla-migration/src/entity/mod.rs | 2 +- .../src/entity/notification.rs | 41 ++++ .../scylla-migration/src/error.rs | 2 +- .../scylla-migration/src/setup.rs | 182 ++++++++++++++++-- 7 files changed, 227 insertions(+), 50 deletions(-) delete mode 100644 packages/backend/native-utils/scylla-migration/src/entity/emoji.rs create mode 100644 packages/backend/native-utils/scylla-migration/src/entity/notification.rs diff --git a/packages/backend/native-utils/Cargo.lock b/packages/backend/native-utils/Cargo.lock index 27e1cfef50..a10f37ee3e 100644 --- a/packages/backend/native-utils/Cargo.lock +++ b/packages/backend/native-utils/Cargo.lock @@ -667,6 +667,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "dialoguer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59c6f2989294b9a498d3ad5491a79c6deb604617378e1cdc4bfc1c1361fe2f87" +dependencies = [ + "console", + "shell-words", + "tempfile", + "zeroize", +] + [[package]] name = "diff" version = "0.1.13" @@ -2445,6 +2457,7 @@ version = "0.1.0" dependencies = [ "chrono", "clap", + "dialoguer", "futures", "indicatif", "scylla", @@ -2764,6 +2777,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shell-words" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" + [[package]] name = "signal-hook-registry" version = "1.4.1" diff --git a/packages/backend/native-utils/scylla-migration/Cargo.toml b/packages/backend/native-utils/scylla-migration/Cargo.toml index e23e73b45e..ccb2f60f7b 100644 --- a/packages/backend/native-utils/scylla-migration/Cargo.toml +++ b/packages/backend/native-utils/scylla-migration/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] chrono = "0.4.26" clap = { version = "4.3.11", features = ["derive"] } +dialoguer = "0.10.4" futures = "0.3.28" indicatif = { version = "0.17.6", features = ["tokio"] } scylla = "0.8.2" diff --git a/packages/backend/native-utils/scylla-migration/src/entity/emoji.rs b/packages/backend/native-utils/scylla-migration/src/entity/emoji.rs deleted file mode 100644 index 95e78c5dd8..0000000000 --- a/packages/backend/native-utils/scylla-migration/src/entity/emoji.rs +++ /dev/null @@ -1,30 +0,0 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 - -use sea_orm::entity::prelude::*; - -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] -#[sea_orm(table_name = "emoji")] -pub struct Model { - #[sea_orm(primary_key, auto_increment = false)] - pub id: String, - #[sea_orm(column_name = "updatedAt")] - pub updated_at: Option, - pub name: String, - pub host: Option, - #[sea_orm(column_name = "originalUrl")] - pub original_url: String, - pub uri: Option, - pub r#type: Option, - pub aliases: Vec, - pub category: Option, - #[sea_orm(column_name = "publicUrl")] - pub public_url: String, - pub license: Option, - pub width: Option, - pub height: Option, -} - -#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation {} - -impl ActiveModelBehavior for ActiveModel {} diff --git a/packages/backend/native-utils/scylla-migration/src/entity/mod.rs b/packages/backend/native-utils/scylla-migration/src/entity/mod.rs index 55aae352d1..0ad165a8ed 100644 --- a/packages/backend/native-utils/scylla-migration/src/entity/mod.rs +++ b/packages/backend/native-utils/scylla-migration/src/entity/mod.rs @@ -1,9 +1,9 @@ pub mod drive_file; -pub mod emoji; pub mod following; pub mod note; pub mod note_edit; pub mod note_reaction; +pub mod notification; pub mod poll; pub mod poll_vote; pub mod user; diff --git a/packages/backend/native-utils/scylla-migration/src/entity/notification.rs b/packages/backend/native-utils/scylla-migration/src/entity/notification.rs new file mode 100644 index 0000000000..c90f0c7890 --- /dev/null +++ b/packages/backend/native-utils/scylla-migration/src/entity/notification.rs @@ -0,0 +1,41 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 + +use super::sea_orm_active_enums::NotificationTypeEnum; +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "notification")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: String, + #[sea_orm(column_name = "createdAt")] + pub created_at: DateTimeWithTimeZone, + #[sea_orm(column_name = "notifieeId")] + pub notifiee_id: String, + #[sea_orm(column_name = "notifierId")] + pub notifier_id: Option, + #[sea_orm(column_name = "isRead")] + pub is_read: bool, + #[sea_orm(column_name = "noteId")] + pub note_id: Option, + pub reaction: Option, + pub choice: Option, + #[sea_orm(column_name = "followRequestId")] + pub follow_request_id: Option, + pub r#type: NotificationTypeEnum, + #[sea_orm(column_name = "userGroupInvitationId")] + pub user_group_invitation_id: Option, + #[sea_orm(column_name = "customBody")] + pub custom_body: Option, + #[sea_orm(column_name = "customHeader")] + pub custom_header: Option, + #[sea_orm(column_name = "customIcon")] + pub custom_icon: Option, + #[sea_orm(column_name = "appAccessTokenId")] + pub app_access_token_id: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/packages/backend/native-utils/scylla-migration/src/error.rs b/packages/backend/native-utils/scylla-migration/src/error.rs index 22881c0cc5..5ef0028a1e 100644 --- a/packages/backend/native-utils/scylla-migration/src/error.rs +++ b/packages/backend/native-utils/scylla-migration/src/error.rs @@ -11,7 +11,7 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum Error { - #[error("Session error: {0}")] + #[error("ScyllaDB Session error: {0}")] Session(#[from] NewSessionError), #[error("Query error: {0}")] Query(#[from] QueryError), diff --git a/packages/backend/native-utils/scylla-migration/src/setup.rs b/packages/backend/native-utils/scylla-migration/src/setup.rs index 6c3d590ab2..dd78d7e499 100644 --- a/packages/backend/native-utils/scylla-migration/src/setup.rs +++ b/packages/backend/native-utils/scylla-migration/src/setup.rs @@ -1,18 +1,21 @@ -use std::{collections::HashMap, fmt::Write, sync::OnceLock}; +use std::collections::HashMap; use chrono::{DateTime, NaiveDate, Utc}; +use dialoguer::{theme::ColorfulTheme, Confirm}; use futures::{future, TryStreamExt}; -use indicatif::{ProgressBar, ProgressState, ProgressStyle}; +use indicatif::{ProgressBar, ProgressStyle}; use scylla::{ - prepared_statement::PreparedStatement, FromUserType, IntoUserType, Session, SessionBuilder, - ValueList, + prepared_statement::PreparedStatement, FromRow, FromUserType, IntoUserType, Session, + SessionBuilder, ValueList, }; use sea_orm::{entity::*, query::*, Database, DatabaseConnection}; use urlencoding::encode; use crate::{ config::{DbConfig, ScyllaConfig}, - entity::{drive_file, emoji, following, note, note_edit, note_reaction, poll, poll_vote, user}, + entity::{ + drive_file, following, note, note_edit, note_reaction, notification, poll, poll_vote, user, + }, error::Error, }; @@ -47,11 +50,33 @@ impl Initializer { } pub(crate) async fn setup(&self) -> Result<(), Error> { + println!("Several tables in PostgreSQL are going to moved to ScyllaDB."); + let pool = Database::connect(&self.postgres_url).await?; let db_backend = pool.get_database_backend(); + println!( + "{}", + dialoguer::console::style( + "This is irreversible! Please backup your PostgreSQL database before you proceed." + ) + .bold() + ); + + let confirm = Confirm::with_theme(&ColorfulTheme::default()) + .with_prompt("This process may take a while. Do you want to continue?") + .interact() + .unwrap_or(false); + + if !confirm { + println!("Cancelled."); + return Ok(()); + } + + println!("Copyping data from PostgreSQL to ScyllaDB."); self.copy(&pool).await?; + println!("Dropping constraints from PostgreSQL."); let fk_pairs = vec![ ("channel_note_pining", "FK_10b19ef67d297ea9de325cd4502"), ("clip_note", "FK_a012eaf5c87c65da1deb5fdbfa3"), @@ -71,6 +96,7 @@ impl Initializer { .await?; } + println!("Dropping tables from PostgreSQL."); let tables = vec![ "note_reaction", "note_edit", @@ -94,6 +120,9 @@ impl Initializer { let note_prepared = self.scylla.prepare(INSERT_NOTE).await?; let home_prepared = self.scylla.prepare(INSERT_HOME_TIMELINE).await?; let reaction_prepared = self.scylla.prepare(INSERT_REACTION).await?; + let vote_insert_prepared = self.scylla.prepare(INSERT_POLL_VOTE).await?; + let vote_select_prepared = self.scylla.prepare(SELECT_POLL_VOTE).await?; + let notification_prepared = self.scylla.prepare(INSERT_NOTIFICATION).await?; let num_notes: i64 = note::Entity::find() .select_only() @@ -102,7 +131,6 @@ impl Initializer { .one(db) .await? .unwrap_or_default(); - let num_reactions: i64 = note_reaction::Entity::find() .select_only() .column_as(note_reaction::Column::Id.count(), "count") @@ -110,37 +138,83 @@ impl Initializer { .one(db) .await? .unwrap_or_default(); - - println!("Copying notes from PostgreSQL to ScyllaDB."); + let num_votes: i64 = poll_vote::Entity::find() + .select_only() + .column_as(poll_vote::Column::Id.count(), "count") + .into_tuple() + .one(db) + .await? + .unwrap_or_default(); + let num_notifications: i64 = notification::Entity::find() + .select_only() + .column_as(notification::Column::Id.count(), "count") + .into_tuple() + .one(db) + .await? + .unwrap_or_default(); const PB_TMPL: &str = "{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})"; let pb_style = ProgressStyle::with_template(PB_TMPL) .unwrap() .progress_chars("#>-"); - let note_pb = ProgressBar::new(num_notes as u64).with_style(pb_style.to_owned()); let reaction_pb = ProgressBar::new(num_reactions as u64).with_style(pb_style.to_owned()); + let vote_pb = ProgressBar::new(num_votes as u64).with_style(pb_style.to_owned()); + let notification_pb = + ProgressBar::new(num_notifications as u64).with_style(pb_style.to_owned()); + let mut note_tasks = Vec::new(); let mut notes = note::Entity::find() .order_by_asc(note::Column::Id) .stream(db) .await?; - let mut note_tasks = Vec::new(); while let Some(note) = notes.try_next().await? { note_tasks.push(self.copy_note(note, db, ¬e_prepared, &home_prepared, ¬e_pb)); } + + let mut reaction_tasks = Vec::new(); let mut reactions = note_reaction::Entity::find() .order_by_asc(note_reaction::Column::Id) .stream(db) .await?; - let mut reaction_tasks = Vec::new(); while let Some(reaction) = reactions.try_next().await? { reaction_tasks.push(self.copy_reaction(reaction, &reaction_prepared, &reaction_pb)); } + let mut vote_tasks = Vec::new(); + let mut votes = poll_vote::Entity::find() + .order_by_asc(poll_vote::Column::Id) + .stream(db) + .await?; + while let Some(vote) = votes.try_next().await? { + vote_tasks.push(self.copy_vote( + vote, + db, + &vote_select_prepared, + &vote_insert_prepared, + &vote_pb, + )); + } + + let mut notification_tasks = Vec::new(); + let mut notifications = notification::Entity::find() + .order_by_asc(notification::Column::Id) + .stream(db) + .await?; + while let Some(n) = notifications.try_next().await? { + notification_tasks.push(self.copy_notification( + n, + db, + ¬ification_prepared, + ¬ification_pb, + )); + } + future::try_join_all(note_tasks).await?; future::try_join_all(reaction_tasks).await?; + future::try_join_all(vote_tasks).await?; + future::try_join_all(notification_tasks).await?; Ok(()) } @@ -149,8 +223,8 @@ impl Initializer { &self, note: note::Model, db: &DatabaseConnection, - prepared_note: &PreparedStatement, - prepared_home: &PreparedStatement, + insert_note: &PreparedStatement, + insert_home: &PreparedStatement, pb: &ProgressBar, ) -> Result<(), Error> { let reply = match ¬e.reply_id { @@ -249,7 +323,7 @@ impl Initializer { }; self.scylla - .execute(prepared_note, scylla_note.to_owned()) + .execute(insert_note, scylla_note.to_owned()) .await?; let mut home_tasks = Vec::new(); @@ -307,7 +381,7 @@ impl Initializer { note_edit: s_note.note_edit, updated_at: s_note.updated_at, }; - home_tasks.push(self.scylla.execute(prepared_home, home)); + home_tasks.push(self.scylla.execute(insert_home, home)); } future::try_join_all(home_tasks).await?; @@ -318,7 +392,7 @@ impl Initializer { async fn copy_reaction( &self, reaction: note_reaction::Model, - prepared: &PreparedStatement, + insert: &PreparedStatement, pb: &ProgressBar, ) -> Result<(), Error> { let scylla_reaction = ReactionTable { @@ -328,7 +402,77 @@ impl Initializer { reaction: reaction.reaction, created_at: reaction.created_at.into(), }; - self.scylla.execute(prepared, scylla_reaction).await?; + self.scylla.execute(insert, scylla_reaction).await?; + + pb.inc(1); + Ok(()) + } + + async fn copy_vote( + &self, + vote: poll_vote::Model, + db: &DatabaseConnection, + select: &PreparedStatement, + insert: &PreparedStatement, + pb: &ProgressBar, + ) -> Result<(), Error> { + let voted_user = user::Entity::find_by_id(&vote.user_id).one(db).await?; + if let Some(u) = voted_user { + let mut scylla_vote = PollVoteTable { + note_id: vote.note_id.to_owned(), + user_id: vote.user_id.to_owned(), + user_host: u.host, + choice: vec![vote.choice], + created_at: vote.created_at.into(), + }; + if let Ok(row) = self + .scylla + .execute(select, (vote.note_id, vote.user_id)) + .await? + .first_row() + { + let mut s_vote: PollVoteTable = row.into_typed()?; + scylla_vote.choice.append(&mut s_vote.choice); + scylla_vote.choice.dedup(); + } + self.scylla.execute(insert, scylla_vote).await?; + } + + pb.inc(1); + Ok(()) + } + + async fn copy_notification( + &self, + model: notification::Model, + db: &DatabaseConnection, + insert: &PreparedStatement, + pb: &ProgressBar, + ) -> Result<(), Error> { + let notifier = match model.notifier_id.to_owned() { + None => None, + Some(id) => user::Entity::find_by_id(&id).one(db).await?, + }; + let s_notification = NotificationTable { + target_id: model.notifiee_id, + created_at_date: model.created_at.date_naive(), + created_at: model.created_at.into(), + id: model.id, + notifier_id: model.notifier_id, + notifier_host: notifier.map(|n| n.host).flatten(), + r#type: model.r#type.to_value(), + entity_id: model + .note_id + .or(model.follow_request_id) + .or(model.user_group_invitation_id) + .or(model.app_access_token_id), + reatcion: model.reaction, + choice: model.choice, + custom_body: model.custom_body, + custom_header: model.custom_header, + custom_icon: model.custom_icon, + }; + self.scylla.execute(insert, s_notification).await?; pb.inc(1); Ok(()) @@ -617,7 +761,7 @@ struct ReactionTable { const INSERT_REACTION: &str = r#"INSERT INTO reaction ("id", "noteId", "userId", "reaction", "createdAt") VALUES (?, ?, ?, ?, ?)"#; -#[derive(ValueList)] +#[derive(ValueList, FromRow)] struct PollVoteTable { note_id: String, user_id: String, @@ -627,6 +771,7 @@ struct PollVoteTable { } const INSERT_POLL_VOTE: &str = r#"INSERT INTO poll_vote ("noteId", "userId", "userHost", "choice", "createdAt") VALUES (?, ?, ?, ?, ?)"#; +const SELECT_POLL_VOTE: &str = r#"SELECT "noteId", "userId", "userHost", "choice", "createdAt" FROM poll_vote WHERE "noteId" = ? AND "userId" = ?"#; #[derive(ValueList)] struct NotificationTable { @@ -641,6 +786,7 @@ struct NotificationTable { reatcion: Option, choice: Option, custom_body: Option, + custom_header: Option, custom_icon: Option, }