diff --git a/package.json b/package.json index b6c392844c..e13da1902b 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,7 @@ "init": "pnpm run migrate", "migrate": "pnpm --filter backend run migrate", "scylla:migrate": "pnpm --filter native-utils run scylla:migrate", - "scylla:setup": "pnpm --filter native-utils run scylla:setup && pnpm run scylla:migrate", + "scylla:setup": "pnpm run scylla:migrate && pnpm --filter native-utils run scylla:setup", "revertmigration": "pnpm --filter backend run revertmigration", "migrateandstart": "pnpm run migrate && pnpm run start", "gulp": "gulp build", diff --git a/packages/backend/native-utils/Cargo.lock b/packages/backend/native-utils/Cargo.lock index a10f37ee3e..f4d5ca4dfc 100644 --- a/packages/backend/native-utils/Cargo.lock +++ b/packages/backend/native-utils/Cargo.lock @@ -1215,6 +1215,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b297dc40733f23a0e52728a58fa9489a5b7638a324932de16b41adc3ef80730" dependencies = [ "console", + "futures-core", "instant", "number_prefix", "portable-atomic", diff --git a/packages/backend/native-utils/package.json b/packages/backend/native-utils/package.json index 407a2797b3..98510f922e 100644 --- a/packages/backend/native-utils/package.json +++ b/packages/backend/native-utils/package.json @@ -38,8 +38,8 @@ "build:napi": "napi build --features napi --platform --release ./built/", "build:migration": "cargo build --locked --release --manifest-path ./migration/Cargo.toml && cp -v ./target/release/migration ./built/migration", "build:debug": "napi build --features napi --platform ./built/ && cargo build --locked --manifest-path ./migration/Cargo.toml && cp -v ./target/debug/migration ./built/migration", - "scylla:migrate": "cargo run --release --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml up", - "scylla:setup": "cargo run --release --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml setup", + "scylla:migrate": "cargo run --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml up", + "scylla:setup": "cargo run --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml setup", "prepublishOnly": "napi prepublish -t npm", "test": "pnpm run cargo:test && pnpm run build:napi && ava", "universal": "napi universal", diff --git a/packages/backend/native-utils/scylla-migration/Cargo.toml b/packages/backend/native-utils/scylla-migration/Cargo.toml index ccb2f60f7b..eaa0b74be3 100644 --- a/packages/backend/native-utils/scylla-migration/Cargo.toml +++ b/packages/backend/native-utils/scylla-migration/Cargo.toml @@ -10,7 +10,7 @@ chrono = "0.4.26" clap = { version = "4.3.11", features = ["derive"] } dialoguer = "0.10.4" futures = "0.3.28" -indicatif = { version = "0.17.6", features = ["tokio"] } +indicatif = { version = "0.17.6", features = ["tokio", "futures"] } scylla = "0.8.2" sea-orm = { version = "0.12.2", features = ["sqlx-postgres", "runtime-tokio-rustls"] } serde = { version = "1.0.171", features = ["derive"] } diff --git a/packages/backend/native-utils/scylla-migration/src/setup.rs b/packages/backend/native-utils/scylla-migration/src/setup.rs index dd78d7e499..a6e7eeb3a1 100644 --- a/packages/backend/native-utils/scylla-migration/src/setup.rs +++ b/packages/backend/native-utils/scylla-migration/src/setup.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; +use std::sync::Arc; use chrono::{DateTime, NaiveDate, Utc}; use dialoguer::{theme::ColorfulTheme, Confirm}; -use futures::{future, TryStreamExt}; -use indicatif::{ProgressBar, ProgressStyle}; +use futures::{future, TryFutureExt, TryStreamExt}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use scylla::{ prepared_statement::PreparedStatement, FromRow, FromUserType, IntoUserType, Session, SessionBuilder, ValueList, @@ -19,8 +20,9 @@ use crate::{ error::Error, }; +#[derive(Clone)] pub(crate) struct Initializer { - scylla: Session, + scylla: Arc, postgres_url: String, } @@ -33,6 +35,7 @@ impl Initializer { .known_nodes(&scylla_conf.nodes) .build() .await?; + session.use_keyspace(&scylla_conf.keyspace, true).await?; let conn_url = format!( "postgres://{}:{}@{}:{}/{}", @@ -44,7 +47,7 @@ impl Initializer { ); Ok(Self { - scylla: session, + scylla: Arc::new(session), postgres_url: conn_url, }) } @@ -74,7 +77,9 @@ impl Initializer { } println!("Copyping data from PostgreSQL to ScyllaDB."); - self.copy(&pool).await?; + self.copy(pool.clone()).await?; + + return Ok(()); println!("Dropping constraints from PostgreSQL."); let fk_pairs = vec![ @@ -116,7 +121,7 @@ impl Initializer { Ok(()) } - async fn copy(&self, db: &DatabaseConnection) -> Result<(), Error> { + async fn copy(&self, db: DatabaseConnection) -> Result<(), Error> { let note_prepared = self.scylla.prepare(INSERT_NOTE).await?; let home_prepared = self.scylla.prepare(INSERT_HOME_TIMELINE).await?; let reaction_prepared = self.scylla.prepare(INSERT_REACTION).await?; @@ -128,93 +133,120 @@ impl Initializer { .select_only() .column_as(note::Column::Id.count(), "count") .into_tuple() - .one(db) + .one(&db) .await? .unwrap_or_default(); + println!("Posts: {num_notes}"); let num_reactions: i64 = note_reaction::Entity::find() .select_only() .column_as(note_reaction::Column::Id.count(), "count") .into_tuple() - .one(db) + .one(&db) .await? .unwrap_or_default(); + println!("Reactions: {num_reactions}"); let num_votes: i64 = poll_vote::Entity::find() .select_only() .column_as(poll_vote::Column::Id.count(), "count") .into_tuple() - .one(db) + .one(&db) .await? .unwrap_or_default(); + println!("Votes: {num_votes}"); let num_notifications: i64 = notification::Entity::find() .select_only() .column_as(notification::Column::Id.count(), "count") .into_tuple() - .one(db) + .one(&db) .await? .unwrap_or_default(); + println!("Notifications: {num_notifications}"); const PB_TMPL: &str = - "{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})"; + "{spinner:.green} {prefix} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})"; let pb_style = ProgressStyle::with_template(PB_TMPL) .unwrap() .progress_chars("#>-"); - let note_pb = ProgressBar::new(num_notes as u64).with_style(pb_style.to_owned()); - let reaction_pb = ProgressBar::new(num_reactions as u64).with_style(pb_style.to_owned()); - let vote_pb = ProgressBar::new(num_votes as u64).with_style(pb_style.to_owned()); + let mp = MultiProgress::new(); + let note_pb = mp.add(ProgressBar::new(num_notes as u64).with_style(pb_style.to_owned()).with_prefix("Posts")); + let reaction_pb = + mp.add(ProgressBar::new(num_reactions as u64).with_style(pb_style.to_owned()).with_prefix("Reactions")); + let vote_pb = mp.add(ProgressBar::new(num_votes as u64).with_style(pb_style.to_owned()).with_prefix("Votes")); let notification_pb = - ProgressBar::new(num_notifications as u64).with_style(pb_style.to_owned()); + mp.add(ProgressBar::new(num_notifications as u64).with_style(pb_style.to_owned()).with_prefix("Notifications")); + + let mut tasks = Vec::new(); - let mut note_tasks = Vec::new(); let mut notes = note::Entity::find() .order_by_asc(note::Column::Id) - .stream(db) + .stream(&db) .await?; while let Some(note) = notes.try_next().await? { - note_tasks.push(self.copy_note(note, db, ¬e_prepared, &home_prepared, ¬e_pb)); + let (s, d, n, h, p) = ( + self.clone(), + db.clone(), + note_prepared.clone(), + home_prepared.clone(), + note_pb.clone(), + ); + let handler = tokio::spawn(async move { + let _ = s.copy_note(note, d, h, n).await; + p.inc(1); + }); + tasks.push(handler); } - let mut reaction_tasks = Vec::new(); let mut reactions = note_reaction::Entity::find() .order_by_asc(note_reaction::Column::Id) - .stream(db) + .stream(&db) .await?; while let Some(reaction) = reactions.try_next().await? { - reaction_tasks.push(self.copy_reaction(reaction, &reaction_prepared, &reaction_pb)); + let (s, r, p) = (self.clone(), reaction_prepared.clone(), reaction_pb.clone()); + let handler = tokio::spawn(async move { + let _ = s.copy_reaction(reaction, r).await; + p.inc(1); + }); + tasks.push(handler); } - let mut vote_tasks = Vec::new(); let mut votes = poll_vote::Entity::find() .order_by_asc(poll_vote::Column::Id) - .stream(db) + .stream(&db) .await?; while let Some(vote) = votes.try_next().await? { - vote_tasks.push(self.copy_vote( - vote, - db, - &vote_select_prepared, - &vote_insert_prepared, - &vote_pb, - )); + let (s, d, sp, ip, p) = ( + self.clone(), + db.clone(), + vote_select_prepared.clone(), + vote_insert_prepared.clone(), + vote_pb.clone(), + ); + let handler = tokio::spawn(async move { + let _ = s.copy_vote(vote, d, sp, ip).await; + p.inc(1); + }); + tasks.push(handler); } - let mut notification_tasks = Vec::new(); let mut notifications = notification::Entity::find() .order_by_asc(notification::Column::Id) - .stream(db) + .stream(&db) .await?; while let Some(n) = notifications.try_next().await? { - notification_tasks.push(self.copy_notification( - n, - db, - ¬ification_prepared, - ¬ification_pb, - )); + let (s, d, ps, p) = ( + self.clone(), + db.clone(), + notification_prepared.clone(), + notification_pb.clone(), + ); + let handler = tokio::spawn(async move { + let _ = s.copy_notification(n, d, ps).await; + p.inc(1); + }); + tasks.push(handler); } - future::try_join_all(note_tasks).await?; - future::try_join_all(reaction_tasks).await?; - future::try_join_all(vote_tasks).await?; - future::try_join_all(notification_tasks).await?; + future::join_all(tasks).await; Ok(()) } @@ -222,25 +254,24 @@ impl Initializer { async fn copy_note( &self, note: note::Model, - db: &DatabaseConnection, - insert_note: &PreparedStatement, - insert_home: &PreparedStatement, - pb: &ProgressBar, + db: DatabaseConnection, + insert_note: PreparedStatement, + insert_home: PreparedStatement, ) -> Result<(), Error> { let reply = match ¬e.reply_id { None => None, - Some(id) => note::Entity::find_by_id(id).one(db).await?, + Some(id) => note::Entity::find_by_id(id).one(&db).await?, }; let renote = match ¬e.renote_id { None => None, - Some(id) => note::Entity::find_by_id(id).one(db).await?, + Some(id) => note::Entity::find_by_id(id).one(&db).await?, }; - let files = get_attached_files(Some(note.to_owned()), db).await?; - let reply_files = get_attached_files(reply.to_owned(), db).await?; - let renote_files = get_attached_files(renote.to_owned(), db).await?; + let files = get_attached_files(Some(note.to_owned()), &db).await?; + let reply_files = get_attached_files(reply.to_owned(), &db).await?; + let renote_files = get_attached_files(renote.to_owned(), &db).await?; - let note_poll = note.find_related(poll::Entity).one(db).await?; + let note_poll = note.find_related(poll::Entity).one(&db).await?; let poll_type = match note_poll { None => None, Some(v) => Some(PollType { @@ -262,13 +293,13 @@ impl Initializer { .map(|(k, v)| (k.to_string(), v.as_i64().unwrap_or_default() as i32)), ), }; - let edits = note.find_related(note_edit::Entity).all(db).await?; + let edits = note.find_related(note_edit::Entity).all(&db).await?; let edits = edits.iter().map(|v| async { let v = v.to_owned(); Ok(NoteEditHistoryType { content: v.text, cw: v.cw, - files: get_files(v.file_ids, db).await?, + files: get_files(v.file_ids, &db).await?, updated_at: v.updated_at.into(), }) }); @@ -323,7 +354,7 @@ impl Initializer { }; self.scylla - .execute(insert_note, scylla_note.to_owned()) + .execute(&insert_note, scylla_note.to_owned()) .await?; let mut home_tasks = Vec::new(); @@ -333,7 +364,7 @@ impl Initializer { .filter(following::Column::FolloweeId.eq(note.user_id)) .filter(following::Column::FollowerHost.is_null()) .into_tuple::() - .stream(db) + .stream(&db) .await?; while let Some(follower_id) = local_followers.try_next().await? { @@ -381,19 +412,17 @@ impl Initializer { note_edit: s_note.note_edit, updated_at: s_note.updated_at, }; - home_tasks.push(self.scylla.execute(insert_home, home)); + home_tasks.push(self.scylla.execute(&insert_home, home)); } future::try_join_all(home_tasks).await?; - pb.inc(1); Ok(()) } async fn copy_reaction( &self, reaction: note_reaction::Model, - insert: &PreparedStatement, - pb: &ProgressBar, + insert: PreparedStatement, ) -> Result<(), Error> { let scylla_reaction = ReactionTable { id: reaction.id, @@ -402,21 +431,19 @@ impl Initializer { reaction: reaction.reaction, created_at: reaction.created_at.into(), }; - self.scylla.execute(insert, scylla_reaction).await?; + self.scylla.execute(&insert, scylla_reaction).await?; - pb.inc(1); Ok(()) } async fn copy_vote( &self, vote: poll_vote::Model, - db: &DatabaseConnection, - select: &PreparedStatement, - insert: &PreparedStatement, - pb: &ProgressBar, + db: DatabaseConnection, + select: PreparedStatement, + insert: PreparedStatement, ) -> Result<(), Error> { - let voted_user = user::Entity::find_by_id(&vote.user_id).one(db).await?; + let voted_user = user::Entity::find_by_id(&vote.user_id).one(&db).await?; if let Some(u) = voted_user { let mut scylla_vote = PollVoteTable { note_id: vote.note_id.to_owned(), @@ -427,7 +454,7 @@ impl Initializer { }; if let Ok(row) = self .scylla - .execute(select, (vote.note_id, vote.user_id)) + .execute(&select, (vote.note_id, vote.user_id)) .await? .first_row() { @@ -435,23 +462,21 @@ impl Initializer { scylla_vote.choice.append(&mut s_vote.choice); scylla_vote.choice.dedup(); } - self.scylla.execute(insert, scylla_vote).await?; + self.scylla.execute(&insert, scylla_vote).await?; } - pb.inc(1); Ok(()) } async fn copy_notification( &self, model: notification::Model, - db: &DatabaseConnection, - insert: &PreparedStatement, - pb: &ProgressBar, + db: DatabaseConnection, + insert: PreparedStatement, ) -> Result<(), Error> { let notifier = match model.notifier_id.to_owned() { None => None, - Some(id) => user::Entity::find_by_id(&id).one(db).await?, + Some(id) => user::Entity::find_by_id(&id).one(&db).await?, }; let s_notification = NotificationTable { target_id: model.notifiee_id, @@ -472,9 +497,8 @@ impl Initializer { custom_header: model.custom_header, custom_icon: model.custom_icon, }; - self.scylla.execute(insert, s_notification).await?; + self.scylla.execute(&insert, s_notification).await?; - pb.inc(1); Ok(()) } }