add note_since option
This commit is contained in:
parent
3d18fa9507
commit
a683766eec
2 changed files with 23 additions and 17 deletions
|
@ -41,9 +41,9 @@ pub async fn run_cli() -> Result<(), Error> {
|
||||||
.down(num)
|
.down(num)
|
||||||
.await?
|
.await?
|
||||||
}
|
}
|
||||||
MigrationCommand::Setup { threads } => {
|
MigrationCommand::Setup { threads , note_since } => {
|
||||||
let initializer = Initializer::new(&scylla_conf, &config.db).await?;
|
let initializer = Initializer::new(&scylla_conf, &config.db).await?;
|
||||||
initializer.setup(threads).await?;
|
initializer.setup(threads, note_since).await?;
|
||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
};
|
};
|
||||||
|
@ -125,5 +125,7 @@ pub(crate) enum MigrationCommand {
|
||||||
display_order = 41
|
display_order = 41
|
||||||
)]
|
)]
|
||||||
threads: u32,
|
threads: u32,
|
||||||
|
#[clap(value_parser, long, help = "Note ID to begin copying")]
|
||||||
|
note_since: Option<String>,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,14 +32,12 @@ impl Initializer {
|
||||||
scylla_conf: &ScyllaConfig,
|
scylla_conf: &ScyllaConfig,
|
||||||
postgres_conf: &DbConfig,
|
postgres_conf: &DbConfig,
|
||||||
) -> Result<Self, Error> {
|
) -> Result<Self, Error> {
|
||||||
|
let mut builder = SessionBuilder::new().known_nodes(&scylla_conf.nodes);
|
||||||
let mut builder = SessionBuilder::new()
|
|
||||||
.known_nodes(&scylla_conf.nodes);
|
|
||||||
|
|
||||||
match &scylla_conf.credentials {
|
match &scylla_conf.credentials {
|
||||||
Some(credentials) => {
|
Some(credentials) => {
|
||||||
builder = builder.user(&credentials.username, &credentials.password);
|
builder = builder.user(&credentials.username, &credentials.password);
|
||||||
},
|
}
|
||||||
None => {}
|
None => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,7 +59,7 @@ impl Initializer {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn setup(&self, threads: u32) -> Result<(), Error> {
|
pub(crate) async fn setup(&self, threads: u32, since: Option<String>) -> Result<(), Error> {
|
||||||
println!("Several tables in PostgreSQL are going to be moved to ScyllaDB.");
|
println!("Several tables in PostgreSQL are going to be moved to ScyllaDB.");
|
||||||
|
|
||||||
let pool = Database::connect(&self.postgres_url).await?;
|
let pool = Database::connect(&self.postgres_url).await?;
|
||||||
|
@ -86,7 +84,7 @@ impl Initializer {
|
||||||
}
|
}
|
||||||
|
|
||||||
println!("Copying data from PostgreSQL to ScyllaDB.");
|
println!("Copying data from PostgreSQL to ScyllaDB.");
|
||||||
self.copy(pool.clone(), threads.try_into().unwrap_or(1))
|
self.copy(pool.clone(), threads.try_into().unwrap_or(1), since)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
println!("Dropping constraints from PostgreSQL.");
|
println!("Dropping constraints from PostgreSQL.");
|
||||||
|
@ -129,7 +127,12 @@ impl Initializer {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn copy(&self, db: DatabaseConnection, threads: usize) -> Result<(), Error> {
|
async fn copy(
|
||||||
|
&self,
|
||||||
|
db: DatabaseConnection,
|
||||||
|
threads: usize,
|
||||||
|
note_since: Option<String>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
let note_prepared = Arc::new(self.scylla.prepare(INSERT_NOTE).await?);
|
let note_prepared = Arc::new(self.scylla.prepare(INSERT_NOTE).await?);
|
||||||
let home_prepared = Arc::new(self.scylla.prepare(INSERT_HOME_TIMELINE).await?);
|
let home_prepared = Arc::new(self.scylla.prepare(INSERT_HOME_TIMELINE).await?);
|
||||||
let reaction_prepared = Arc::new(self.scylla.prepare(INSERT_REACTION).await?);
|
let reaction_prepared = Arc::new(self.scylla.prepare(INSERT_REACTION).await?);
|
||||||
|
@ -200,10 +203,12 @@ impl Initializer {
|
||||||
let mut tasks = Vec::new();
|
let mut tasks = Vec::new();
|
||||||
let sem = Arc::new(Semaphore::new(threads));
|
let sem = Arc::new(Semaphore::new(threads));
|
||||||
|
|
||||||
let mut notes = note::Entity::find()
|
let mut notes = note::Entity::find().order_by_asc(note::Column::Id);
|
||||||
.order_by_asc(note::Column::Id)
|
if let Some(since_id) = note_since {
|
||||||
.stream(&db)
|
notes = notes.filter(note::Column::Id.gt(&since_id));
|
||||||
.await?;
|
}
|
||||||
|
let mut notes = notes.stream(&db).await?;
|
||||||
|
|
||||||
while let Some(note) = notes.try_next().await? {
|
while let Some(note) = notes.try_next().await? {
|
||||||
if let Ok(permit) = Arc::clone(&sem).acquire_owned().await {
|
if let Ok(permit) = Arc::clone(&sem).acquire_owned().await {
|
||||||
let (s, d, n, h, p) = (
|
let (s, d, n, h, p) = (
|
||||||
|
@ -419,7 +424,6 @@ impl Initializer {
|
||||||
.execute(&insert_note, scylla_note.to_owned())
|
.execute(&insert_note, scylla_note.to_owned())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let mut home_tasks = Vec::new();
|
|
||||||
let s_note = scylla_note.clone();
|
let s_note = scylla_note.clone();
|
||||||
let mut home = HomeTimelineTable {
|
let mut home = HomeTimelineTable {
|
||||||
feed_user_id: s_note.user_id.clone(),
|
feed_user_id: s_note.user_id.clone(),
|
||||||
|
@ -466,7 +470,7 @@ impl Initializer {
|
||||||
updated_at: s_note.updated_at,
|
updated_at: s_note.updated_at,
|
||||||
};
|
};
|
||||||
if s_note.user_host == "local" {
|
if s_note.user_host == "local" {
|
||||||
home_tasks.push(self.scylla.execute(&insert_home, home.clone()));
|
self.scylla.execute(&insert_home, home.clone()).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut local_followers = following::Entity::find()
|
let mut local_followers = following::Entity::find()
|
||||||
|
@ -479,9 +483,9 @@ impl Initializer {
|
||||||
.await?;
|
.await?;
|
||||||
while let Some(follower_id) = local_followers.try_next().await? {
|
while let Some(follower_id) = local_followers.try_next().await? {
|
||||||
home.feed_user_id = follower_id;
|
home.feed_user_id = follower_id;
|
||||||
home_tasks.push(self.scylla.execute(&insert_home, home.clone()));
|
// Ignore even if ScyllaDB timeouts so that we can try to insert the note to the remaining timelines.
|
||||||
|
let _ = self.scylla.execute(&insert_home, home.clone()).await;
|
||||||
}
|
}
|
||||||
future::try_join_all(home_tasks).await?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue