diff --git a/package.json b/package.json index e13da1902b..47a2ba20d7 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,7 @@ "init": "pnpm run migrate", "migrate": "pnpm --filter backend run migrate", "scylla:migrate": "pnpm --filter native-utils run scylla:migrate", - "scylla:setup": "pnpm run scylla:migrate && pnpm --filter native-utils run scylla:setup", + "scylla:setup": "pnpm --filter native-utils run scylla:setup", "revertmigration": "pnpm --filter backend run revertmigration", "migrateandstart": "pnpm run migrate && pnpm run start", "gulp": "gulp build", diff --git a/packages/backend/native-utils/package.json b/packages/backend/native-utils/package.json index 98510f922e..407a2797b3 100644 --- a/packages/backend/native-utils/package.json +++ b/packages/backend/native-utils/package.json @@ -38,8 +38,8 @@ "build:napi": "napi build --features napi --platform --release ./built/", "build:migration": "cargo build --locked --release --manifest-path ./migration/Cargo.toml && cp -v ./target/release/migration ./built/migration", "build:debug": "napi build --features napi --platform ./built/ && cargo build --locked --manifest-path ./migration/Cargo.toml && cp -v ./target/debug/migration ./built/migration", - "scylla:migrate": "cargo run --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml up", - "scylla:setup": "cargo run --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml setup", + "scylla:migrate": "cargo run --release --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml up", + "scylla:setup": "cargo run --release --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml setup", "prepublishOnly": "napi prepublish -t npm", "test": "pnpm run cargo:test && pnpm run build:napi && ava", "universal": "napi universal", diff --git a/packages/backend/native-utils/scylla-migration/src/cli.rs b/packages/backend/native-utils/scylla-migration/src/cli.rs index 0832747630..c43bd7a680 100644 --- a/packages/backend/native-utils/scylla-migration/src/cli.rs +++ b/packages/backend/native-utils/scylla-migration/src/cli.rs @@ -41,9 +41,9 @@ pub async fn run_cli() -> Result<(), Error> { .down(num) .await? } - MigrationCommand::Setup => { + MigrationCommand::Setup { multi_thread } => { let initializer = Initializer::new(&scylla_conf, &config.db).await?; - initializer.setup().await?; + initializer.setup(multi_thread).await?; } _ => {} }; @@ -115,5 +115,15 @@ pub(crate) enum MigrationCommand { num: u32, }, #[clap(about = "Set up PostgreSQL and ScyllaDB", display_order = 40)] - Setup, + Setup { + #[clap( + value_parser, + short, + long, + default_value = "false", + help = "Enable multi-thread mode (WARNING: High memory consumption)", + display_order = 41 + )] + multi_thread: bool, + }, } diff --git a/packages/backend/native-utils/scylla-migration/src/entity/mod.rs b/packages/backend/native-utils/scylla-migration/src/entity/mod.rs index 0ad165a8ed..5f21d2a6a0 100644 --- a/packages/backend/native-utils/scylla-migration/src/entity/mod.rs +++ b/packages/backend/native-utils/scylla-migration/src/entity/mod.rs @@ -6,5 +6,5 @@ pub mod note_reaction; pub mod notification; pub mod poll; pub mod poll_vote; -pub mod user; pub mod sea_orm_active_enums; +pub mod user; diff --git a/packages/backend/native-utils/scylla-migration/src/entity/note.rs b/packages/backend/native-utils/scylla-migration/src/entity/note.rs index 441b350ff9..f18d812f5e 100644 --- a/packages/backend/native-utils/scylla-migration/src/entity/note.rs +++ b/packages/backend/native-utils/scylla-migration/src/entity/note.rs @@ -93,5 +93,4 @@ impl Related for Entity { } } - impl ActiveModelBehavior for ActiveModel {} diff --git a/packages/backend/native-utils/scylla-migration/src/error.rs b/packages/backend/native-utils/scylla-migration/src/error.rs index 5ef0028a1e..d53d1bf1ca 100644 --- a/packages/backend/native-utils/scylla-migration/src/error.rs +++ b/packages/backend/native-utils/scylla-migration/src/error.rs @@ -11,13 +11,13 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum Error { - #[error("ScyllaDB Session error: {0}")] + #[error("ScyllaDB session error: {0}")] Session(#[from] NewSessionError), - #[error("Query error: {0}")] + #[error("ScyllaDB query error: {0}")] Query(#[from] QueryError), - #[error("Conversion error: {0}")] + #[error("ScyllaDB conversion error: {0}")] Conversion(#[from] FromRowError), - #[error("Row error: {0}")] + #[error("ScyllaDB row error: {0}")] Row(#[from] SingleRowTypedError), #[error("File error: {0}")] File(#[from] io::Error), diff --git a/packages/backend/native-utils/scylla-migration/src/main.rs b/packages/backend/native-utils/scylla-migration/src/main.rs index 257901d316..9b21f0515c 100644 --- a/packages/backend/native-utils/scylla-migration/src/main.rs +++ b/packages/backend/native-utils/scylla-migration/src/main.rs @@ -2,6 +2,12 @@ use scylla_migration::{cli::run_cli, error::Error}; #[tokio::main] async fn main() -> Result<(), Error> { + let default_panic = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |info| { + default_panic(info); + std::process::exit(1); + })); + run_cli().await?; Ok(()) diff --git a/packages/backend/native-utils/scylla-migration/src/setup.rs b/packages/backend/native-utils/scylla-migration/src/setup.rs index a6e7eeb3a1..ec6a0bd7f2 100644 --- a/packages/backend/native-utils/scylla-migration/src/setup.rs +++ b/packages/backend/native-utils/scylla-migration/src/setup.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use chrono::{DateTime, NaiveDate, Utc}; use dialoguer::{theme::ColorfulTheme, Confirm}; -use futures::{future, TryFutureExt, TryStreamExt}; +use futures::{future, TryStreamExt}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use scylla::{ prepared_statement::PreparedStatement, FromRow, FromUserType, IntoUserType, Session, @@ -52,7 +52,7 @@ impl Initializer { }) } - pub(crate) async fn setup(&self) -> Result<(), Error> { + pub(crate) async fn setup(&self, mt: bool) -> Result<(), Error> { println!("Several tables in PostgreSQL are going to moved to ScyllaDB."); let pool = Database::connect(&self.postgres_url).await?; @@ -77,9 +77,7 @@ impl Initializer { } println!("Copyping data from PostgreSQL to ScyllaDB."); - self.copy(pool.clone()).await?; - - return Ok(()); + self.copy(pool.clone(), mt).await?; println!("Dropping constraints from PostgreSQL."); let fk_pairs = vec![ @@ -121,13 +119,13 @@ impl Initializer { Ok(()) } - async fn copy(&self, db: DatabaseConnection) -> Result<(), Error> { - let note_prepared = self.scylla.prepare(INSERT_NOTE).await?; - let home_prepared = self.scylla.prepare(INSERT_HOME_TIMELINE).await?; - let reaction_prepared = self.scylla.prepare(INSERT_REACTION).await?; - let vote_insert_prepared = self.scylla.prepare(INSERT_POLL_VOTE).await?; - let vote_select_prepared = self.scylla.prepare(SELECT_POLL_VOTE).await?; - let notification_prepared = self.scylla.prepare(INSERT_NOTIFICATION).await?; + async fn copy(&self, db: DatabaseConnection, multi_thread: bool) -> Result<(), Error> { + let note_prepared = Arc::new(self.scylla.prepare(INSERT_NOTE).await?); + let home_prepared = Arc::new(self.scylla.prepare(INSERT_HOME_TIMELINE).await?); + let reaction_prepared = Arc::new(self.scylla.prepare(INSERT_REACTION).await?); + let vote_insert_prepared = Arc::new(self.scylla.prepare(INSERT_POLL_VOTE).await?); + let vote_select_prepared = Arc::new(self.scylla.prepare(SELECT_POLL_VOTE).await?); + let notification_prepared = Arc::new(self.scylla.prepare(INSERT_NOTIFICATION).await?); let num_notes: i64 = note::Entity::find() .select_only() @@ -168,12 +166,26 @@ impl Initializer { .unwrap() .progress_chars("#>-"); let mp = MultiProgress::new(); - let note_pb = mp.add(ProgressBar::new(num_notes as u64).with_style(pb_style.to_owned()).with_prefix("Posts")); - let reaction_pb = - mp.add(ProgressBar::new(num_reactions as u64).with_style(pb_style.to_owned()).with_prefix("Reactions")); - let vote_pb = mp.add(ProgressBar::new(num_votes as u64).with_style(pb_style.to_owned()).with_prefix("Votes")); - let notification_pb = - mp.add(ProgressBar::new(num_notifications as u64).with_style(pb_style.to_owned()).with_prefix("Notifications")); + let note_pb = mp.add( + ProgressBar::new(num_notes as u64) + .with_style(pb_style.to_owned()) + .with_prefix("Posts"), + ); + let reaction_pb = mp.add( + ProgressBar::new(num_reactions as u64) + .with_style(pb_style.to_owned()) + .with_prefix("Reactions"), + ); + let vote_pb = mp.add( + ProgressBar::new(num_votes as u64) + .with_style(pb_style.to_owned()) + .with_prefix("Votes"), + ); + let notification_pb = mp.add( + ProgressBar::new(num_notifications as u64) + .with_style(pb_style.to_owned()) + .with_prefix("Notifications"), + ); let mut tasks = Vec::new(); @@ -189,11 +201,16 @@ impl Initializer { home_prepared.clone(), note_pb.clone(), ); - let handler = tokio::spawn(async move { - let _ = s.copy_note(note, d, h, n).await; + let f = async move { + s.copy_note(note, d, n, h).await.expect("Note copy failed"); p.inc(1); - }); - tasks.push(handler); + }; + if multi_thread { + let handler = tokio::spawn(f); + tasks.push(handler); + } else { + (|| f)().await; + } } let mut reactions = note_reaction::Entity::find() @@ -202,11 +219,18 @@ impl Initializer { .await?; while let Some(reaction) = reactions.try_next().await? { let (s, r, p) = (self.clone(), reaction_prepared.clone(), reaction_pb.clone()); - let handler = tokio::spawn(async move { - let _ = s.copy_reaction(reaction, r).await; + let f = async move { + s.copy_reaction(reaction, r) + .await + .expect("Reaction copy failed"); p.inc(1); - }); - tasks.push(handler); + }; + if multi_thread { + let handler = tokio::spawn(f); + tasks.push(handler); + } else { + (|| f)().await; + } } let mut votes = poll_vote::Entity::find() @@ -221,11 +245,18 @@ impl Initializer { vote_insert_prepared.clone(), vote_pb.clone(), ); - let handler = tokio::spawn(async move { - let _ = s.copy_vote(vote, d, sp, ip).await; + let f = async move { + s.copy_vote(vote, d, sp, ip) + .await + .expect("Vote copy failed"); p.inc(1); - }); - tasks.push(handler); + }; + if multi_thread { + let handler = tokio::spawn(f); + tasks.push(handler); + } else { + (|| f)().await; + } } let mut notifications = notification::Entity::find() @@ -239,11 +270,18 @@ impl Initializer { notification_prepared.clone(), notification_pb.clone(), ); - let handler = tokio::spawn(async move { - let _ = s.copy_notification(n, d, ps).await; + let f = async move { + s.copy_notification(n, d, ps) + .await + .expect("Notification copy failed"); p.inc(1); - }); - tasks.push(handler); + }; + if multi_thread { + let handler = tokio::spawn(f); + tasks.push(handler); + } else { + (|| f)().await; + } } future::join_all(tasks).await; @@ -255,8 +293,8 @@ impl Initializer { &self, note: note::Model, db: DatabaseConnection, - insert_note: PreparedStatement, - insert_home: PreparedStatement, + insert_note: Arc, + insert_home: Arc, ) -> Result<(), Error> { let reply = match ¬e.reply_id { None => None, @@ -313,7 +351,7 @@ impl Initializer { let scylla_note = NoteTable { created_at_date: note.created_at.date_naive(), created_at: note.created_at.into(), - id: note.id.to_owned(), + id: note.id.clone(), visibility: note.visibility.to_value(), content: note.text, name: note.name, @@ -334,8 +372,8 @@ impl Initializer { poll: poll_type, thread_id: note.thread_id, channel_id: note.channel_id, - user_id: note.user_id.to_owned(), - user_host: note.user_host, + user_id: note.user_id.clone(), + user_host: note.user_host.unwrap_or("local".to_string()), reply_id: note.reply_id, reply_user_id: note.reply_user_id, reply_user_host: note.reply_user_host, @@ -358,6 +396,54 @@ impl Initializer { .await?; let mut home_tasks = Vec::new(); + let s_note = scylla_note.clone(); + let mut home = HomeTimelineTable { + feed_user_id: s_note.user_id.clone(), + created_at_date: s_note.created_at_date, + created_at: s_note.created_at, + id: s_note.id, + visibility: s_note.visibility, + content: s_note.content, + name: s_note.name, + cw: s_note.cw, + local_only: s_note.local_only, + renote_count: s_note.renote_count, + replies_count: s_note.replies_count, + uri: s_note.uri, + url: s_note.url, + score: s_note.score, + files: s_note.files, + visible_user_ids: s_note.visible_user_ids, + mentions: s_note.mentions, + mentioned_remote_users: s_note.mentioned_remote_users, + emojis: s_note.emojis, + tags: s_note.tags, + has_poll: s_note.has_poll, + poll: s_note.poll, + thread_id: s_note.thread_id, + channel_id: s_note.channel_id, + user_id: s_note.user_id, + user_host: s_note.user_host.clone(), + reply_id: s_note.reply_id, + reply_user_id: s_note.reply_user_id, + reply_user_host: s_note.reply_user_host, + reply_content: s_note.reply_content, + reply_cw: s_note.reply_cw, + reply_files: s_note.reply_files, + renote_id: s_note.renote_id, + renote_user_id: s_note.renote_user_id, + renote_user_host: s_note.renote_user_host, + renote_content: s_note.renote_content, + renote_cw: s_note.renote_cw, + renote_files: s_note.renote_files, + reactions: s_note.reactions, + note_edit: s_note.note_edit, + updated_at: s_note.updated_at, + }; + if s_note.user_host == "local" { + home_tasks.push(self.scylla.execute(&insert_home, home.clone())); + } + let mut local_followers = following::Entity::find() .select_only() .column(following::Column::FollowerId) @@ -366,53 +452,9 @@ impl Initializer { .into_tuple::() .stream(&db) .await?; - while let Some(follower_id) = local_followers.try_next().await? { - let s_note = scylla_note.to_owned(); - let home = HomeTimelineTable { - feed_user_id: follower_id, - created_at_date: s_note.created_at_date, - created_at: s_note.created_at, - id: s_note.id, - visibility: s_note.visibility, - content: s_note.content, - name: s_note.name, - cw: s_note.cw, - local_only: s_note.local_only, - renote_count: s_note.renote_count, - replies_count: s_note.replies_count, - uri: s_note.uri, - url: s_note.url, - score: s_note.score, - files: s_note.files, - visible_user_ids: s_note.visible_user_ids, - mentions: s_note.mentions, - mentioned_remote_users: s_note.mentioned_remote_users, - emojis: s_note.emojis, - tags: s_note.tags, - has_poll: s_note.has_poll, - poll: s_note.poll, - thread_id: s_note.thread_id, - channel_id: s_note.channel_id, - user_id: s_note.user_id, - user_host: s_note.user_host, - reply_id: s_note.reply_id, - reply_user_id: s_note.reply_user_id, - reply_user_host: s_note.reply_user_host, - reply_content: s_note.reply_content, - reply_cw: s_note.reply_cw, - reply_files: s_note.reply_files, - renote_id: s_note.renote_id, - renote_user_id: s_note.renote_user_id, - renote_user_host: s_note.renote_user_host, - renote_content: s_note.renote_content, - renote_cw: s_note.renote_cw, - renote_files: s_note.renote_files, - reactions: s_note.reactions, - note_edit: s_note.note_edit, - updated_at: s_note.updated_at, - }; - home_tasks.push(self.scylla.execute(&insert_home, home)); + home.feed_user_id = follower_id; + home_tasks.push(self.scylla.execute(&insert_home, home.clone())); } future::try_join_all(home_tasks).await?; @@ -422,7 +464,7 @@ impl Initializer { async fn copy_reaction( &self, reaction: note_reaction::Model, - insert: PreparedStatement, + insert: Arc, ) -> Result<(), Error> { let scylla_reaction = ReactionTable { id: reaction.id, @@ -440,8 +482,8 @@ impl Initializer { &self, vote: poll_vote::Model, db: DatabaseConnection, - select: PreparedStatement, - insert: PreparedStatement, + select: Arc, + insert: Arc, ) -> Result<(), Error> { let voted_user = user::Entity::find_by_id(&vote.user_id).one(&db).await?; if let Some(u) = voted_user { @@ -472,7 +514,7 @@ impl Initializer { &self, model: notification::Model, db: DatabaseConnection, - insert: PreparedStatement, + insert: Arc, ) -> Result<(), Error> { let notifier = match model.notifier_id.to_owned() { None => None, @@ -620,7 +662,7 @@ struct NoteTable { thread_id: Option, channel_id: Option, user_id: String, - user_host: Option, + user_host: String, reply_id: Option, reply_user_id: Option, reply_user_host: Option, @@ -683,7 +725,7 @@ INSERT INTO note ( ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) "#; -#[derive(ValueList)] +#[derive(Clone, ValueList)] struct HomeTimelineTable { feed_user_id: String, created_at_date: NaiveDate, @@ -710,7 +752,7 @@ struct HomeTimelineTable { thread_id: Option, channel_id: Option, user_id: String, - user_host: Option, + user_host: String, reply_id: Option, reply_user_id: Option, reply_user_host: Option,