Merge branch 'feat/scylladb' of git.joinfirefish.org:firefish/firefish into feat/scylladb

This commit is contained in:
ThatOneCalculator 2023-09-25 21:14:27 -07:00
commit c07e8c303e
No known key found for this signature in database
GPG key ID: 8703CACD01000000
2 changed files with 60 additions and 18 deletions

View file

@ -41,9 +41,13 @@ pub async fn run_cli() -> Result<(), Error> {
.down(num) .down(num)
.await? .await?
} }
MigrationCommand::Setup { threads } => { MigrationCommand::Setup {
threads,
note_since,
note_skip
} => {
let initializer = Initializer::new(&scylla_conf, &config.db).await?; let initializer = Initializer::new(&scylla_conf, &config.db).await?;
initializer.setup(threads).await?; initializer.setup(threads, note_skip, note_since).await?;
} }
_ => {} _ => {}
}; };
@ -125,5 +129,20 @@ pub(crate) enum MigrationCommand {
display_order = 41 display_order = 41
)] )]
threads: u32, threads: u32,
#[clap(
value_parser,
long,
help = "Note ID to begin copying",
display_order = 42
)]
note_since: Option<String>,
#[clap(
value_parser,
long,
default_value = "0",
help = "The number of notes to be skipped while copying",
display_order = 43
)]
note_skip: u64,
}, },
} }

View file

@ -32,14 +32,12 @@ impl Initializer {
scylla_conf: &ScyllaConfig, scylla_conf: &ScyllaConfig,
postgres_conf: &DbConfig, postgres_conf: &DbConfig,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let mut builder = SessionBuilder::new().known_nodes(&scylla_conf.nodes);
let mut builder = SessionBuilder::new()
.known_nodes(&scylla_conf.nodes);
match &scylla_conf.credentials { match &scylla_conf.credentials {
Some(credentials) => { Some(credentials) => {
builder = builder.user(&credentials.username, &credentials.password); builder = builder.user(&credentials.username, &credentials.password);
}, }
None => {} None => {}
} }
@ -61,7 +59,12 @@ impl Initializer {
}) })
} }
pub(crate) async fn setup(&self, threads: u32) -> Result<(), Error> { pub(crate) async fn setup(
&self,
threads: u32,
skip: u64,
since: Option<String>,
) -> Result<(), Error> {
println!("Several tables in PostgreSQL are going to be moved to ScyllaDB."); println!("Several tables in PostgreSQL are going to be moved to ScyllaDB.");
let pool = Database::connect(&self.postgres_url).await?; let pool = Database::connect(&self.postgres_url).await?;
@ -86,7 +89,7 @@ impl Initializer {
} }
println!("Copying data from PostgreSQL to ScyllaDB."); println!("Copying data from PostgreSQL to ScyllaDB.");
self.copy(pool.clone(), threads.try_into().unwrap_or(1)) self.copy(pool.clone(), threads.try_into().unwrap_or(1), skip, since)
.await?; .await?;
println!("Dropping constraints from PostgreSQL."); println!("Dropping constraints from PostgreSQL.");
@ -129,7 +132,13 @@ impl Initializer {
Ok(()) Ok(())
} }
async fn copy(&self, db: DatabaseConnection, threads: usize) -> Result<(), Error> { async fn copy(
&self,
db: DatabaseConnection,
threads: usize,
note_skip: u64,
note_since: Option<String>,
) -> Result<(), Error> {
let note_prepared = Arc::new(self.scylla.prepare(INSERT_NOTE).await?); let note_prepared = Arc::new(self.scylla.prepare(INSERT_NOTE).await?);
let home_prepared = Arc::new(self.scylla.prepare(INSERT_HOME_TIMELINE).await?); let home_prepared = Arc::new(self.scylla.prepare(INSERT_HOME_TIMELINE).await?);
let reaction_prepared = Arc::new(self.scylla.prepare(INSERT_REACTION).await?); let reaction_prepared = Arc::new(self.scylla.prepare(INSERT_REACTION).await?);
@ -137,13 +146,20 @@ impl Initializer {
let vote_select_prepared = Arc::new(self.scylla.prepare(SELECT_POLL_VOTE).await?); let vote_select_prepared = Arc::new(self.scylla.prepare(SELECT_POLL_VOTE).await?);
let notification_prepared = Arc::new(self.scylla.prepare(INSERT_NOTIFICATION).await?); let notification_prepared = Arc::new(self.scylla.prepare(INSERT_NOTIFICATION).await?);
let num_notes: i64 = note::Entity::find() let mut num_notes = note::Entity::find();
if let Some(since) = note_since.clone() {
num_notes = num_notes.filter(note::Column::Id.gt(&since));
}
let mut num_notes: i64 = num_notes
.select_only() .select_only()
.column_as(note::Column::Id.count(), "count") .column_as(note::Column::Id.count(), "count")
.into_tuple() .into_tuple()
.one(&db) .one(&db)
.await? .await?
.unwrap_or_default(); .unwrap_or_default();
num_notes -= note_skip as i64;
println!("Posts: {num_notes}"); println!("Posts: {num_notes}");
let num_reactions: i64 = note_reaction::Entity::find() let num_reactions: i64 = note_reaction::Entity::find()
.select_only() .select_only()
@ -200,11 +216,19 @@ impl Initializer {
let mut tasks = Vec::new(); let mut tasks = Vec::new();
let sem = Arc::new(Semaphore::new(threads)); let sem = Arc::new(Semaphore::new(threads));
let mut notes = note::Entity::find() let mut notes = note::Entity::find().order_by_asc(note::Column::Id);
.order_by_asc(note::Column::Id) if let Some(since_id) = note_since {
.stream(&db) notes = notes.filter(note::Column::Id.gt(&since_id));
.await?; }
let mut notes = notes.stream(&db).await?;
let mut copied: u64 = 0;
while let Some(note) = notes.try_next().await? { while let Some(note) = notes.try_next().await? {
copied += 1;
if copied <= note_skip {
continue;
}
if let Ok(permit) = Arc::clone(&sem).acquire_owned().await { if let Ok(permit) = Arc::clone(&sem).acquire_owned().await {
let (s, d, n, h, p) = ( let (s, d, n, h, p) = (
self.clone(), self.clone(),
@ -419,7 +443,6 @@ impl Initializer {
.execute(&insert_note, scylla_note.to_owned()) .execute(&insert_note, scylla_note.to_owned())
.await?; .await?;
let mut home_tasks = Vec::new();
let s_note = scylla_note.clone(); let s_note = scylla_note.clone();
let mut home = HomeTimelineTable { let mut home = HomeTimelineTable {
feed_user_id: s_note.user_id.clone(), feed_user_id: s_note.user_id.clone(),
@ -466,7 +489,7 @@ impl Initializer {
updated_at: s_note.updated_at, updated_at: s_note.updated_at,
}; };
if s_note.user_host == "local" { if s_note.user_host == "local" {
home_tasks.push(self.scylla.execute(&insert_home, home.clone())); self.scylla.execute(&insert_home, home.clone()).await?;
} }
let mut local_followers = following::Entity::find() let mut local_followers = following::Entity::find()
@ -479,9 +502,9 @@ impl Initializer {
.await?; .await?;
while let Some(follower_id) = local_followers.try_next().await? { while let Some(follower_id) = local_followers.try_next().await? {
home.feed_user_id = follower_id; home.feed_user_id = follower_id;
home_tasks.push(self.scylla.execute(&insert_home, home.clone())); // Ignore even if ScyllaDB timeouts so that we can try to insert the note to the remaining timelines.
let _ = self.scylla.execute(&insert_home, home.clone()).await;
} }
future::try_join_all(home_tasks).await?;
Ok(()) Ok(())
} }