diff --git a/packages/backend/native-utils/scylla-migration/src/cli.rs b/packages/backend/native-utils/scylla-migration/src/cli.rs index c772dcb504..5fbc5e2dd4 100644 --- a/packages/backend/native-utils/scylla-migration/src/cli.rs +++ b/packages/backend/native-utils/scylla-migration/src/cli.rs @@ -41,9 +41,9 @@ pub async fn run_cli() -> Result<(), Error> { .down(num) .await? } - MigrationCommand::Setup { threads } => { + MigrationCommand::Setup { threads , note_since } => { let initializer = Initializer::new(&scylla_conf, &config.db).await?; - initializer.setup(threads).await?; + initializer.setup(threads, note_since).await?; } _ => {} }; @@ -125,5 +125,7 @@ pub(crate) enum MigrationCommand { display_order = 41 )] threads: u32, + #[clap(value_parser, long, help = "Note ID to begin copying")] + note_since: Option, }, } diff --git a/packages/backend/native-utils/scylla-migration/src/setup.rs b/packages/backend/native-utils/scylla-migration/src/setup.rs index a952703c13..aeaab504e6 100644 --- a/packages/backend/native-utils/scylla-migration/src/setup.rs +++ b/packages/backend/native-utils/scylla-migration/src/setup.rs @@ -32,14 +32,12 @@ impl Initializer { scylla_conf: &ScyllaConfig, postgres_conf: &DbConfig, ) -> Result { - - let mut builder = SessionBuilder::new() - .known_nodes(&scylla_conf.nodes); + let mut builder = SessionBuilder::new().known_nodes(&scylla_conf.nodes); match &scylla_conf.credentials { Some(credentials) => { builder = builder.user(&credentials.username, &credentials.password); - }, + } None => {} } @@ -61,7 +59,7 @@ impl Initializer { }) } - pub(crate) async fn setup(&self, threads: u32) -> Result<(), Error> { + pub(crate) async fn setup(&self, threads: u32, since: Option) -> Result<(), Error> { println!("Several tables in PostgreSQL are going to be moved to ScyllaDB."); let pool = Database::connect(&self.postgres_url).await?; @@ -86,7 +84,7 @@ impl Initializer { } println!("Copying data from PostgreSQL to ScyllaDB."); - self.copy(pool.clone(), threads.try_into().unwrap_or(1)) + self.copy(pool.clone(), threads.try_into().unwrap_or(1), since) .await?; println!("Dropping constraints from PostgreSQL."); @@ -129,7 +127,12 @@ impl Initializer { Ok(()) } - async fn copy(&self, db: DatabaseConnection, threads: usize) -> Result<(), Error> { + async fn copy( + &self, + db: DatabaseConnection, + threads: usize, + note_since: Option, + ) -> Result<(), Error> { let note_prepared = Arc::new(self.scylla.prepare(INSERT_NOTE).await?); let home_prepared = Arc::new(self.scylla.prepare(INSERT_HOME_TIMELINE).await?); let reaction_prepared = Arc::new(self.scylla.prepare(INSERT_REACTION).await?); @@ -200,10 +203,12 @@ impl Initializer { let mut tasks = Vec::new(); let sem = Arc::new(Semaphore::new(threads)); - let mut notes = note::Entity::find() - .order_by_asc(note::Column::Id) - .stream(&db) - .await?; + let mut notes = note::Entity::find().order_by_asc(note::Column::Id); + if let Some(since_id) = note_since { + notes = notes.filter(note::Column::Id.gt(&since_id)); + } + let mut notes = notes.stream(&db).await?; + while let Some(note) = notes.try_next().await? { if let Ok(permit) = Arc::clone(&sem).acquire_owned().await { let (s, d, n, h, p) = ( @@ -419,7 +424,6 @@ impl Initializer { .execute(&insert_note, scylla_note.to_owned()) .await?; - let mut home_tasks = Vec::new(); let s_note = scylla_note.clone(); let mut home = HomeTimelineTable { feed_user_id: s_note.user_id.clone(), @@ -466,7 +470,7 @@ impl Initializer { updated_at: s_note.updated_at, }; if s_note.user_host == "local" { - home_tasks.push(self.scylla.execute(&insert_home, home.clone())); + self.scylla.execute(&insert_home, home.clone()).await?; } let mut local_followers = following::Entity::find() @@ -479,9 +483,9 @@ impl Initializer { .await?; while let Some(follower_id) = local_followers.try_next().await? { home.feed_user_id = follower_id; - home_tasks.push(self.scylla.execute(&insert_home, home.clone())); + // Ignore even if ScyllaDB timeouts so that we can try to insert the note to the remaining timelines. + let _ = self.scylla.execute(&insert_home, home.clone()).await; } - future::try_join_all(home_tasks).await?; Ok(()) }