diff --git a/packages/backend/native-utils/scylla-migration/src/cli.rs b/packages/backend/native-utils/scylla-migration/src/cli.rs index c772dcb504..be8d83d1a6 100644 --- a/packages/backend/native-utils/scylla-migration/src/cli.rs +++ b/packages/backend/native-utils/scylla-migration/src/cli.rs @@ -41,9 +41,13 @@ pub async fn run_cli() -> Result<(), Error> { .down(num) .await? } - MigrationCommand::Setup { threads } => { + MigrationCommand::Setup { + threads, + note_since, + note_skip + } => { let initializer = Initializer::new(&scylla_conf, &config.db).await?; - initializer.setup(threads).await?; + initializer.setup(threads, note_skip, note_since).await?; } _ => {} }; @@ -125,5 +129,20 @@ pub(crate) enum MigrationCommand { display_order = 41 )] threads: u32, + #[clap( + value_parser, + long, + help = "Note ID to begin copying", + display_order = 42 + )] + note_since: Option, + #[clap( + value_parser, + long, + default_value = "0", + help = "The number of notes to be skipped while copying", + display_order = 43 + )] + note_skip: u64, }, } diff --git a/packages/backend/native-utils/scylla-migration/src/setup.rs b/packages/backend/native-utils/scylla-migration/src/setup.rs index a952703c13..bc011a9eaa 100644 --- a/packages/backend/native-utils/scylla-migration/src/setup.rs +++ b/packages/backend/native-utils/scylla-migration/src/setup.rs @@ -32,14 +32,12 @@ impl Initializer { scylla_conf: &ScyllaConfig, postgres_conf: &DbConfig, ) -> Result { - - let mut builder = SessionBuilder::new() - .known_nodes(&scylla_conf.nodes); + let mut builder = SessionBuilder::new().known_nodes(&scylla_conf.nodes); match &scylla_conf.credentials { Some(credentials) => { builder = builder.user(&credentials.username, &credentials.password); - }, + } None => {} } @@ -61,7 +59,12 @@ impl Initializer { }) } - pub(crate) async fn setup(&self, threads: u32) -> Result<(), Error> { + pub(crate) async fn setup( + &self, + threads: u32, + skip: u64, + since: Option, + ) -> Result<(), Error> { println!("Several tables in PostgreSQL are going to be moved to ScyllaDB."); let pool = Database::connect(&self.postgres_url).await?; @@ -86,7 +89,7 @@ impl Initializer { } println!("Copying data from PostgreSQL to ScyllaDB."); - self.copy(pool.clone(), threads.try_into().unwrap_or(1)) + self.copy(pool.clone(), threads.try_into().unwrap_or(1), skip, since) .await?; println!("Dropping constraints from PostgreSQL."); @@ -129,7 +132,13 @@ impl Initializer { Ok(()) } - async fn copy(&self, db: DatabaseConnection, threads: usize) -> Result<(), Error> { + async fn copy( + &self, + db: DatabaseConnection, + threads: usize, + note_skip: u64, + note_since: Option, + ) -> Result<(), Error> { let note_prepared = Arc::new(self.scylla.prepare(INSERT_NOTE).await?); let home_prepared = Arc::new(self.scylla.prepare(INSERT_HOME_TIMELINE).await?); let reaction_prepared = Arc::new(self.scylla.prepare(INSERT_REACTION).await?); @@ -137,13 +146,20 @@ impl Initializer { let vote_select_prepared = Arc::new(self.scylla.prepare(SELECT_POLL_VOTE).await?); let notification_prepared = Arc::new(self.scylla.prepare(INSERT_NOTIFICATION).await?); - let num_notes: i64 = note::Entity::find() + let mut num_notes = note::Entity::find(); + + if let Some(since) = note_since.clone() { + num_notes = num_notes.filter(note::Column::Id.gt(&since)); + } + + let mut num_notes: i64 = num_notes .select_only() .column_as(note::Column::Id.count(), "count") .into_tuple() .one(&db) .await? .unwrap_or_default(); + num_notes -= note_skip as i64; println!("Posts: {num_notes}"); let num_reactions: i64 = note_reaction::Entity::find() .select_only() @@ -200,11 +216,19 @@ impl Initializer { let mut tasks = Vec::new(); let sem = Arc::new(Semaphore::new(threads)); - let mut notes = note::Entity::find() - .order_by_asc(note::Column::Id) - .stream(&db) - .await?; + let mut notes = note::Entity::find().order_by_asc(note::Column::Id); + if let Some(since_id) = note_since { + notes = notes.filter(note::Column::Id.gt(&since_id)); + } + let mut notes = notes.stream(&db).await?; + + let mut copied: u64 = 0; + while let Some(note) = notes.try_next().await? { + copied += 1; + if copied <= note_skip { + continue; + } if let Ok(permit) = Arc::clone(&sem).acquire_owned().await { let (s, d, n, h, p) = ( self.clone(), @@ -419,7 +443,6 @@ impl Initializer { .execute(&insert_note, scylla_note.to_owned()) .await?; - let mut home_tasks = Vec::new(); let s_note = scylla_note.clone(); let mut home = HomeTimelineTable { feed_user_id: s_note.user_id.clone(), @@ -466,7 +489,7 @@ impl Initializer { updated_at: s_note.updated_at, }; if s_note.user_host == "local" { - home_tasks.push(self.scylla.execute(&insert_home, home.clone())); + self.scylla.execute(&insert_home, home.clone()).await?; } let mut local_followers = following::Entity::find() @@ -479,9 +502,9 @@ impl Initializer { .await?; while let Some(follower_id) = local_followers.try_next().await? { home.feed_user_id = follower_id; - home_tasks.push(self.scylla.execute(&insert_home, home.clone())); + // Ignore even if ScyllaDB timeouts so that we can try to insert the note to the remaining timelines. + let _ = self.scylla.execute(&insert_home, home.clone()).await; } - future::try_join_all(home_tasks).await?; Ok(()) }