From cdf5a31fd2b264c925df067bce6a2c593161b3c1 Mon Sep 17 00:00:00 2001 From: Namekuji Date: Mon, 25 Sep 2023 17:02:03 -0400 Subject: [PATCH] add note-skip option --- .../native-utils/scylla-migration/src/cli.rs | 23 ++++++++++++++++--- .../scylla-migration/src/setup.rs | 16 +++++++++++-- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/packages/backend/native-utils/scylla-migration/src/cli.rs b/packages/backend/native-utils/scylla-migration/src/cli.rs index 5fbc5e2dd4..ad1942a8c5 100644 --- a/packages/backend/native-utils/scylla-migration/src/cli.rs +++ b/packages/backend/native-utils/scylla-migration/src/cli.rs @@ -41,9 +41,13 @@ pub async fn run_cli() -> Result<(), Error> { .down(num) .await? } - MigrationCommand::Setup { threads , note_since } => { + MigrationCommand::Setup { + threads, + note_since, + note_skip + } => { let initializer = Initializer::new(&scylla_conf, &config.db).await?; - initializer.setup(threads, note_since).await?; + initializer.setup(threads, note_skip, note_since).await?; } _ => {} }; @@ -125,7 +129,20 @@ pub(crate) enum MigrationCommand { display_order = 41 )] threads: u32, - #[clap(value_parser, long, help = "Note ID to begin copying")] + #[clap( + value_parser, + long, + help = "Note ID to begin copying", + display_order = 42 + )] note_since: Option, + #[clap( + value_parser, + long, + default_value = "0", + help = "Tne number of notes to be skipped", + display_order = 43 + )] + note_skip: u64, }, } diff --git a/packages/backend/native-utils/scylla-migration/src/setup.rs b/packages/backend/native-utils/scylla-migration/src/setup.rs index aeaab504e6..04bf565e02 100644 --- a/packages/backend/native-utils/scylla-migration/src/setup.rs +++ b/packages/backend/native-utils/scylla-migration/src/setup.rs @@ -59,7 +59,12 @@ impl Initializer { }) } - pub(crate) async fn setup(&self, threads: u32, since: Option) -> Result<(), Error> { + pub(crate) async fn setup( + &self, + threads: u32, + skip: u64, + since: Option, + ) -> Result<(), Error> { println!("Several tables in PostgreSQL are going to be moved to ScyllaDB."); let pool = Database::connect(&self.postgres_url).await?; @@ -84,7 +89,7 @@ impl Initializer { } println!("Copying data from PostgreSQL to ScyllaDB."); - self.copy(pool.clone(), threads.try_into().unwrap_or(1), since) + self.copy(pool.clone(), threads.try_into().unwrap_or(1), skip, since) .await?; println!("Dropping constraints from PostgreSQL."); @@ -131,6 +136,7 @@ impl Initializer { &self, db: DatabaseConnection, threads: usize, + note_skip: u64, note_since: Option, ) -> Result<(), Error> { let note_prepared = Arc::new(self.scylla.prepare(INSERT_NOTE).await?); @@ -209,7 +215,13 @@ impl Initializer { } let mut notes = notes.stream(&db).await?; + let mut copied: u64 = 0; + while let Some(note) = notes.try_next().await? { + copied += 1; + if copied <= note_skip { + continue; + } if let Ok(permit) = Arc::clone(&sem).acquire_owned().await { let (s, d, n, h, p) = ( self.clone(),