From a683766eec33ae393587dcc976bbddcff092f68c Mon Sep 17 00:00:00 2001 From: Namekuji Date: Mon, 25 Sep 2023 16:02:13 -0400 Subject: [PATCH 1/7] add note_since option --- .../native-utils/scylla-migration/src/cli.rs | 6 ++-- .../scylla-migration/src/setup.rs | 34 +++++++++++-------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/packages/backend/native-utils/scylla-migration/src/cli.rs b/packages/backend/native-utils/scylla-migration/src/cli.rs index c772dcb504..5fbc5e2dd4 100644 --- a/packages/backend/native-utils/scylla-migration/src/cli.rs +++ b/packages/backend/native-utils/scylla-migration/src/cli.rs @@ -41,9 +41,9 @@ pub async fn run_cli() -> Result<(), Error> { .down(num) .await? } - MigrationCommand::Setup { threads } => { + MigrationCommand::Setup { threads , note_since } => { let initializer = Initializer::new(&scylla_conf, &config.db).await?; - initializer.setup(threads).await?; + initializer.setup(threads, note_since).await?; } _ => {} }; @@ -125,5 +125,7 @@ pub(crate) enum MigrationCommand { display_order = 41 )] threads: u32, + #[clap(value_parser, long, help = "Note ID to begin copying")] + note_since: Option, }, } diff --git a/packages/backend/native-utils/scylla-migration/src/setup.rs b/packages/backend/native-utils/scylla-migration/src/setup.rs index a952703c13..aeaab504e6 100644 --- a/packages/backend/native-utils/scylla-migration/src/setup.rs +++ b/packages/backend/native-utils/scylla-migration/src/setup.rs @@ -32,14 +32,12 @@ impl Initializer { scylla_conf: &ScyllaConfig, postgres_conf: &DbConfig, ) -> Result { - - let mut builder = SessionBuilder::new() - .known_nodes(&scylla_conf.nodes); + let mut builder = SessionBuilder::new().known_nodes(&scylla_conf.nodes); match &scylla_conf.credentials { Some(credentials) => { builder = builder.user(&credentials.username, &credentials.password); - }, + } None => {} } @@ -61,7 +59,7 @@ impl Initializer { }) } - pub(crate) async fn setup(&self, threads: u32) -> Result<(), Error> { + pub(crate) async fn setup(&self, threads: u32, since: Option) -> Result<(), Error> { println!("Several tables in PostgreSQL are going to be moved to ScyllaDB."); let pool = Database::connect(&self.postgres_url).await?; @@ -86,7 +84,7 @@ impl Initializer { } println!("Copying data from PostgreSQL to ScyllaDB."); - self.copy(pool.clone(), threads.try_into().unwrap_or(1)) + self.copy(pool.clone(), threads.try_into().unwrap_or(1), since) .await?; println!("Dropping constraints from PostgreSQL."); @@ -129,7 +127,12 @@ impl Initializer { Ok(()) } - async fn copy(&self, db: DatabaseConnection, threads: usize) -> Result<(), Error> { + async fn copy( + &self, + db: DatabaseConnection, + threads: usize, + note_since: Option, + ) -> Result<(), Error> { let note_prepared = Arc::new(self.scylla.prepare(INSERT_NOTE).await?); let home_prepared = Arc::new(self.scylla.prepare(INSERT_HOME_TIMELINE).await?); let reaction_prepared = Arc::new(self.scylla.prepare(INSERT_REACTION).await?); @@ -200,10 +203,12 @@ impl Initializer { let mut tasks = Vec::new(); let sem = Arc::new(Semaphore::new(threads)); - let mut notes = note::Entity::find() - .order_by_asc(note::Column::Id) - .stream(&db) - .await?; + let mut notes = note::Entity::find().order_by_asc(note::Column::Id); + if let Some(since_id) = note_since { + notes = notes.filter(note::Column::Id.gt(&since_id)); + } + let mut notes = notes.stream(&db).await?; + while let Some(note) = notes.try_next().await? { if let Ok(permit) = Arc::clone(&sem).acquire_owned().await { let (s, d, n, h, p) = ( @@ -419,7 +424,6 @@ impl Initializer { .execute(&insert_note, scylla_note.to_owned()) .await?; - let mut home_tasks = Vec::new(); let s_note = scylla_note.clone(); let mut home = HomeTimelineTable { feed_user_id: s_note.user_id.clone(), @@ -466,7 +470,7 @@ impl Initializer { updated_at: s_note.updated_at, }; if s_note.user_host == "local" { - home_tasks.push(self.scylla.execute(&insert_home, home.clone())); + self.scylla.execute(&insert_home, home.clone()).await?; } let mut local_followers = following::Entity::find() @@ -479,9 +483,9 @@ impl Initializer { .await?; while let Some(follower_id) = local_followers.try_next().await? { home.feed_user_id = follower_id; - home_tasks.push(self.scylla.execute(&insert_home, home.clone())); + // Ignore even if ScyllaDB timeouts so that we can try to insert the note to the remaining timelines. + let _ = self.scylla.execute(&insert_home, home.clone()).await; } - future::try_join_all(home_tasks).await?; Ok(()) } From cdf5a31fd2b264c925df067bce6a2c593161b3c1 Mon Sep 17 00:00:00 2001 From: Namekuji Date: Mon, 25 Sep 2023 17:02:03 -0400 Subject: [PATCH 2/7] add note-skip option --- .../native-utils/scylla-migration/src/cli.rs | 23 ++++++++++++++++--- .../scylla-migration/src/setup.rs | 16 +++++++++++-- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/packages/backend/native-utils/scylla-migration/src/cli.rs b/packages/backend/native-utils/scylla-migration/src/cli.rs index 5fbc5e2dd4..ad1942a8c5 100644 --- a/packages/backend/native-utils/scylla-migration/src/cli.rs +++ b/packages/backend/native-utils/scylla-migration/src/cli.rs @@ -41,9 +41,13 @@ pub async fn run_cli() -> Result<(), Error> { .down(num) .await? } - MigrationCommand::Setup { threads , note_since } => { + MigrationCommand::Setup { + threads, + note_since, + note_skip + } => { let initializer = Initializer::new(&scylla_conf, &config.db).await?; - initializer.setup(threads, note_since).await?; + initializer.setup(threads, note_skip, note_since).await?; } _ => {} }; @@ -125,7 +129,20 @@ pub(crate) enum MigrationCommand { display_order = 41 )] threads: u32, - #[clap(value_parser, long, help = "Note ID to begin copying")] + #[clap( + value_parser, + long, + help = "Note ID to begin copying", + display_order = 42 + )] note_since: Option, + #[clap( + value_parser, + long, + default_value = "0", + help = "Tne number of notes to be skipped", + display_order = 43 + )] + note_skip: u64, }, } diff --git a/packages/backend/native-utils/scylla-migration/src/setup.rs b/packages/backend/native-utils/scylla-migration/src/setup.rs index aeaab504e6..04bf565e02 100644 --- a/packages/backend/native-utils/scylla-migration/src/setup.rs +++ b/packages/backend/native-utils/scylla-migration/src/setup.rs @@ -59,7 +59,12 @@ impl Initializer { }) } - pub(crate) async fn setup(&self, threads: u32, since: Option) -> Result<(), Error> { + pub(crate) async fn setup( + &self, + threads: u32, + skip: u64, + since: Option, + ) -> Result<(), Error> { println!("Several tables in PostgreSQL are going to be moved to ScyllaDB."); let pool = Database::connect(&self.postgres_url).await?; @@ -84,7 +89,7 @@ impl Initializer { } println!("Copying data from PostgreSQL to ScyllaDB."); - self.copy(pool.clone(), threads.try_into().unwrap_or(1), since) + self.copy(pool.clone(), threads.try_into().unwrap_or(1), skip, since) .await?; println!("Dropping constraints from PostgreSQL."); @@ -131,6 +136,7 @@ impl Initializer { &self, db: DatabaseConnection, threads: usize, + note_skip: u64, note_since: Option, ) -> Result<(), Error> { let note_prepared = Arc::new(self.scylla.prepare(INSERT_NOTE).await?); @@ -209,7 +215,13 @@ impl Initializer { } let mut notes = notes.stream(&db).await?; + let mut copied: u64 = 0; + while let Some(note) = notes.try_next().await? { + copied += 1; + if copied <= note_skip { + continue; + } if let Ok(permit) = Arc::clone(&sem).acquire_owned().await { let (s, d, n, h, p) = ( self.clone(), From e3a1c0d14d829d2d15af45d51bf0899411b30db5 Mon Sep 17 00:00:00 2001 From: Namekuji Date: Mon, 25 Sep 2023 17:05:49 -0400 Subject: [PATCH 3/7] fix: typo --- packages/backend/native-utils/scylla-migration/src/cli.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/native-utils/scylla-migration/src/cli.rs b/packages/backend/native-utils/scylla-migration/src/cli.rs index ad1942a8c5..be8d83d1a6 100644 --- a/packages/backend/native-utils/scylla-migration/src/cli.rs +++ b/packages/backend/native-utils/scylla-migration/src/cli.rs @@ -140,7 +140,7 @@ pub(crate) enum MigrationCommand { value_parser, long, default_value = "0", - help = "Tne number of notes to be skipped", + help = "The number of notes to be skipped while copying", display_order = 43 )] note_skip: u64, From 6c378818fb9baf8ad110a3339369ed60fe828dc0 Mon Sep 17 00:00:00 2001 From: Namekuji Date: Mon, 25 Sep 2023 18:45:01 -0400 Subject: [PATCH 4/7] fix: progress bar --- .../native-utils/scylla-migration/src/setup.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/packages/backend/native-utils/scylla-migration/src/setup.rs b/packages/backend/native-utils/scylla-migration/src/setup.rs index 04bf565e02..30ac208c30 100644 --- a/packages/backend/native-utils/scylla-migration/src/setup.rs +++ b/packages/backend/native-utils/scylla-migration/src/setup.rs @@ -146,13 +146,18 @@ impl Initializer { let vote_select_prepared = Arc::new(self.scylla.prepare(SELECT_POLL_VOTE).await?); let notification_prepared = Arc::new(self.scylla.prepare(INSERT_NOTIFICATION).await?); - let num_notes: i64 = note::Entity::find() - .select_only() - .column_as(note::Column::Id.count(), "count") + let mut num_notes = note::Entity::find(); + + if let Some(since) = note_since.clone() { + num_notes = num_notes.filter(note::Column::Id.gt(&since)); + } + + let mut num_notes: u64 = num_notes.select_only().column_as(note::Column::Id.count(), "count") .into_tuple() .one(&db) .await? .unwrap_or_default(); + num_notes -= note_skip; println!("Posts: {num_notes}"); let num_reactions: i64 = note_reaction::Entity::find() .select_only() From 08d29f6da1de8549467ff749630ceddb068f6dfd Mon Sep 17 00:00:00 2001 From: Namekuji Date: Mon, 25 Sep 2023 19:02:01 -0400 Subject: [PATCH 5/7] fix: i64 --- .../backend/native-utils/scylla-migration/src/setup.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/backend/native-utils/scylla-migration/src/setup.rs b/packages/backend/native-utils/scylla-migration/src/setup.rs index 30ac208c30..067dfff749 100644 --- a/packages/backend/native-utils/scylla-migration/src/setup.rs +++ b/packages/backend/native-utils/scylla-migration/src/setup.rs @@ -62,7 +62,7 @@ impl Initializer { pub(crate) async fn setup( &self, threads: u32, - skip: u64, + skip: i64, since: Option, ) -> Result<(), Error> { println!("Several tables in PostgreSQL are going to be moved to ScyllaDB."); @@ -152,12 +152,14 @@ impl Initializer { num_notes = num_notes.filter(note::Column::Id.gt(&since)); } - let mut num_notes: u64 = num_notes.select_only().column_as(note::Column::Id.count(), "count") + let mut num_notes: i64 = num_notes + .select_only() + .column_as(note::Column::Id.count(), "count") .into_tuple() .one(&db) .await? .unwrap_or_default(); - num_notes -= note_skip; + num_notes -= note_skip as i64; println!("Posts: {num_notes}"); let num_reactions: i64 = note_reaction::Entity::find() .select_only() @@ -220,7 +222,7 @@ impl Initializer { } let mut notes = notes.stream(&db).await?; - let mut copied: u64 = 0; + let mut copied: i64 = 0; while let Some(note) = notes.try_next().await? { copied += 1; From 6f5c8a8917263ba624f65ab0403710d0b9cd68b3 Mon Sep 17 00:00:00 2001 From: Namekuji Date: Mon, 25 Sep 2023 19:02:42 -0400 Subject: [PATCH 6/7] fix: type consistency --- packages/backend/native-utils/scylla-migration/src/setup.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/native-utils/scylla-migration/src/setup.rs b/packages/backend/native-utils/scylla-migration/src/setup.rs index 067dfff749..8748fcc357 100644 --- a/packages/backend/native-utils/scylla-migration/src/setup.rs +++ b/packages/backend/native-utils/scylla-migration/src/setup.rs @@ -222,7 +222,7 @@ impl Initializer { } let mut notes = notes.stream(&db).await?; - let mut copied: i64 = 0; + let mut copied: u64 = 0; while let Some(note) = notes.try_next().await? { copied += 1; From d854c749c04d85b585884123a80a63802fb33262 Mon Sep 17 00:00:00 2001 From: Namekuji Date: Mon, 25 Sep 2023 19:04:30 -0400 Subject: [PATCH 7/7] fix: type consistencies --- packages/backend/native-utils/scylla-migration/src/setup.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/native-utils/scylla-migration/src/setup.rs b/packages/backend/native-utils/scylla-migration/src/setup.rs index 8748fcc357..bc011a9eaa 100644 --- a/packages/backend/native-utils/scylla-migration/src/setup.rs +++ b/packages/backend/native-utils/scylla-migration/src/setup.rs @@ -62,7 +62,7 @@ impl Initializer { pub(crate) async fn setup( &self, threads: u32, - skip: i64, + skip: u64, since: Option, ) -> Result<(), Error> { println!("Several tables in PostgreSQL are going to be moved to ScyllaDB.");