add note-skip option

This commit is contained in:
Namekuji 2023-09-25 17:02:03 -04:00
parent a683766eec
commit cdf5a31fd2
No known key found for this signature in database
GPG key ID: 1D62332C07FBA532
2 changed files with 34 additions and 5 deletions

View file

@ -41,9 +41,13 @@ pub async fn run_cli() -> Result<(), Error> {
.down(num) .down(num)
.await? .await?
} }
MigrationCommand::Setup { threads , note_since } => { MigrationCommand::Setup {
threads,
note_since,
note_skip
} => {
let initializer = Initializer::new(&scylla_conf, &config.db).await?; let initializer = Initializer::new(&scylla_conf, &config.db).await?;
initializer.setup(threads, note_since).await?; initializer.setup(threads, note_skip, note_since).await?;
} }
_ => {} _ => {}
}; };
@ -125,7 +129,20 @@ pub(crate) enum MigrationCommand {
display_order = 41 display_order = 41
)] )]
threads: u32, threads: u32,
#[clap(value_parser, long, help = "Note ID to begin copying")] #[clap(
value_parser,
long,
help = "Note ID to begin copying",
display_order = 42
)]
note_since: Option<String>, note_since: Option<String>,
#[clap(
value_parser,
long,
default_value = "0",
help = "Tne number of notes to be skipped",
display_order = 43
)]
note_skip: u64,
}, },
} }

View file

@ -59,7 +59,12 @@ impl Initializer {
}) })
} }
pub(crate) async fn setup(&self, threads: u32, since: Option<String>) -> Result<(), Error> { pub(crate) async fn setup(
&self,
threads: u32,
skip: u64,
since: Option<String>,
) -> Result<(), Error> {
println!("Several tables in PostgreSQL are going to be moved to ScyllaDB."); println!("Several tables in PostgreSQL are going to be moved to ScyllaDB.");
let pool = Database::connect(&self.postgres_url).await?; let pool = Database::connect(&self.postgres_url).await?;
@ -84,7 +89,7 @@ impl Initializer {
} }
println!("Copying data from PostgreSQL to ScyllaDB."); println!("Copying data from PostgreSQL to ScyllaDB.");
self.copy(pool.clone(), threads.try_into().unwrap_or(1), since) self.copy(pool.clone(), threads.try_into().unwrap_or(1), skip, since)
.await?; .await?;
println!("Dropping constraints from PostgreSQL."); println!("Dropping constraints from PostgreSQL.");
@ -131,6 +136,7 @@ impl Initializer {
&self, &self,
db: DatabaseConnection, db: DatabaseConnection,
threads: usize, threads: usize,
note_skip: u64,
note_since: Option<String>, note_since: Option<String>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let note_prepared = Arc::new(self.scylla.prepare(INSERT_NOTE).await?); let note_prepared = Arc::new(self.scylla.prepare(INSERT_NOTE).await?);
@ -209,7 +215,13 @@ impl Initializer {
} }
let mut notes = notes.stream(&db).await?; let mut notes = notes.stream(&db).await?;
let mut copied: u64 = 0;
while let Some(note) = notes.try_next().await? { while let Some(note) = notes.try_next().await? {
copied += 1;
if copied <= note_skip {
continue;
}
if let Ok(permit) = Arc::clone(&sem).acquire_owned().await { if let Ok(permit) = Arc::clone(&sem).acquire_owned().await {
let (s, d, n, h, p) = ( let (s, d, n, h, p) = (
self.clone(), self.clone(),