fix: tasks

This commit is contained in:
Namekuji 2023-09-09 04:39:06 -04:00
parent ee71630842
commit 641b14b9ea
No known key found for this signature in database
GPG key ID: 1D62332C07FBA532
5 changed files with 105 additions and 80 deletions

View file

@ -16,7 +16,7 @@
"init": "pnpm run migrate",
"migrate": "pnpm --filter backend run migrate",
"scylla:migrate": "pnpm --filter native-utils run scylla:migrate",
"scylla:setup": "pnpm --filter native-utils run scylla:setup && pnpm run scylla:migrate",
"scylla:setup": "pnpm run scylla:migrate && pnpm --filter native-utils run scylla:setup",
"revertmigration": "pnpm --filter backend run revertmigration",
"migrateandstart": "pnpm run migrate && pnpm run start",
"gulp": "gulp build",

View file

@ -1215,6 +1215,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b297dc40733f23a0e52728a58fa9489a5b7638a324932de16b41adc3ef80730"
dependencies = [
"console",
"futures-core",
"instant",
"number_prefix",
"portable-atomic",

View file

@ -38,8 +38,8 @@
"build:napi": "napi build --features napi --platform --release ./built/",
"build:migration": "cargo build --locked --release --manifest-path ./migration/Cargo.toml && cp -v ./target/release/migration ./built/migration",
"build:debug": "napi build --features napi --platform ./built/ && cargo build --locked --manifest-path ./migration/Cargo.toml && cp -v ./target/debug/migration ./built/migration",
"scylla:migrate": "cargo run --release --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml up",
"scylla:setup": "cargo run --release --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml setup",
"scylla:migrate": "cargo run --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml up",
"scylla:setup": "cargo run --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml setup",
"prepublishOnly": "napi prepublish -t npm",
"test": "pnpm run cargo:test && pnpm run build:napi && ava",
"universal": "napi universal",

View file

@ -10,7 +10,7 @@ chrono = "0.4.26"
clap = { version = "4.3.11", features = ["derive"] }
dialoguer = "0.10.4"
futures = "0.3.28"
indicatif = { version = "0.17.6", features = ["tokio"] }
indicatif = { version = "0.17.6", features = ["tokio", "futures"] }
scylla = "0.8.2"
sea-orm = { version = "0.12.2", features = ["sqlx-postgres", "runtime-tokio-rustls"] }
serde = { version = "1.0.171", features = ["derive"] }

View file

@ -1,9 +1,10 @@
use std::collections::HashMap;
use std::sync::Arc;
use chrono::{DateTime, NaiveDate, Utc};
use dialoguer::{theme::ColorfulTheme, Confirm};
use futures::{future, TryStreamExt};
use indicatif::{ProgressBar, ProgressStyle};
use futures::{future, TryFutureExt, TryStreamExt};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use scylla::{
prepared_statement::PreparedStatement, FromRow, FromUserType, IntoUserType, Session,
SessionBuilder, ValueList,
@ -19,8 +20,9 @@ use crate::{
error::Error,
};
#[derive(Clone)]
pub(crate) struct Initializer {
scylla: Session,
scylla: Arc<Session>,
postgres_url: String,
}
@ -33,6 +35,7 @@ impl Initializer {
.known_nodes(&scylla_conf.nodes)
.build()
.await?;
session.use_keyspace(&scylla_conf.keyspace, true).await?;
let conn_url = format!(
"postgres://{}:{}@{}:{}/{}",
@ -44,7 +47,7 @@ impl Initializer {
);
Ok(Self {
scylla: session,
scylla: Arc::new(session),
postgres_url: conn_url,
})
}
@ -74,7 +77,9 @@ impl Initializer {
}
println!("Copyping data from PostgreSQL to ScyllaDB.");
self.copy(&pool).await?;
self.copy(pool.clone()).await?;
return Ok(());
println!("Dropping constraints from PostgreSQL.");
let fk_pairs = vec![
@ -116,7 +121,7 @@ impl Initializer {
Ok(())
}
async fn copy(&self, db: &DatabaseConnection) -> Result<(), Error> {
async fn copy(&self, db: DatabaseConnection) -> Result<(), Error> {
let note_prepared = self.scylla.prepare(INSERT_NOTE).await?;
let home_prepared = self.scylla.prepare(INSERT_HOME_TIMELINE).await?;
let reaction_prepared = self.scylla.prepare(INSERT_REACTION).await?;
@ -128,93 +133,120 @@ impl Initializer {
.select_only()
.column_as(note::Column::Id.count(), "count")
.into_tuple()
.one(db)
.one(&db)
.await?
.unwrap_or_default();
println!("Posts: {num_notes}");
let num_reactions: i64 = note_reaction::Entity::find()
.select_only()
.column_as(note_reaction::Column::Id.count(), "count")
.into_tuple()
.one(db)
.one(&db)
.await?
.unwrap_or_default();
println!("Reactions: {num_reactions}");
let num_votes: i64 = poll_vote::Entity::find()
.select_only()
.column_as(poll_vote::Column::Id.count(), "count")
.into_tuple()
.one(db)
.one(&db)
.await?
.unwrap_or_default();
println!("Votes: {num_votes}");
let num_notifications: i64 = notification::Entity::find()
.select_only()
.column_as(notification::Column::Id.count(), "count")
.into_tuple()
.one(db)
.one(&db)
.await?
.unwrap_or_default();
println!("Notifications: {num_notifications}");
const PB_TMPL: &str =
"{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})";
"{spinner:.green} {prefix} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})";
let pb_style = ProgressStyle::with_template(PB_TMPL)
.unwrap()
.progress_chars("#>-");
let note_pb = ProgressBar::new(num_notes as u64).with_style(pb_style.to_owned());
let reaction_pb = ProgressBar::new(num_reactions as u64).with_style(pb_style.to_owned());
let vote_pb = ProgressBar::new(num_votes as u64).with_style(pb_style.to_owned());
let mp = MultiProgress::new();
let note_pb = mp.add(ProgressBar::new(num_notes as u64).with_style(pb_style.to_owned()).with_prefix("Posts"));
let reaction_pb =
mp.add(ProgressBar::new(num_reactions as u64).with_style(pb_style.to_owned()).with_prefix("Reactions"));
let vote_pb = mp.add(ProgressBar::new(num_votes as u64).with_style(pb_style.to_owned()).with_prefix("Votes"));
let notification_pb =
ProgressBar::new(num_notifications as u64).with_style(pb_style.to_owned());
mp.add(ProgressBar::new(num_notifications as u64).with_style(pb_style.to_owned()).with_prefix("Notifications"));
let mut tasks = Vec::new();
let mut note_tasks = Vec::new();
let mut notes = note::Entity::find()
.order_by_asc(note::Column::Id)
.stream(db)
.stream(&db)
.await?;
while let Some(note) = notes.try_next().await? {
note_tasks.push(self.copy_note(note, db, &note_prepared, &home_prepared, &note_pb));
let (s, d, n, h, p) = (
self.clone(),
db.clone(),
note_prepared.clone(),
home_prepared.clone(),
note_pb.clone(),
);
let handler = tokio::spawn(async move {
let _ = s.copy_note(note, d, h, n).await;
p.inc(1);
});
tasks.push(handler);
}
let mut reaction_tasks = Vec::new();
let mut reactions = note_reaction::Entity::find()
.order_by_asc(note_reaction::Column::Id)
.stream(db)
.stream(&db)
.await?;
while let Some(reaction) = reactions.try_next().await? {
reaction_tasks.push(self.copy_reaction(reaction, &reaction_prepared, &reaction_pb));
let (s, r, p) = (self.clone(), reaction_prepared.clone(), reaction_pb.clone());
let handler = tokio::spawn(async move {
let _ = s.copy_reaction(reaction, r).await;
p.inc(1);
});
tasks.push(handler);
}
let mut vote_tasks = Vec::new();
let mut votes = poll_vote::Entity::find()
.order_by_asc(poll_vote::Column::Id)
.stream(db)
.stream(&db)
.await?;
while let Some(vote) = votes.try_next().await? {
vote_tasks.push(self.copy_vote(
vote,
db,
&vote_select_prepared,
&vote_insert_prepared,
&vote_pb,
));
let (s, d, sp, ip, p) = (
self.clone(),
db.clone(),
vote_select_prepared.clone(),
vote_insert_prepared.clone(),
vote_pb.clone(),
);
let handler = tokio::spawn(async move {
let _ = s.copy_vote(vote, d, sp, ip).await;
p.inc(1);
});
tasks.push(handler);
}
let mut notification_tasks = Vec::new();
let mut notifications = notification::Entity::find()
.order_by_asc(notification::Column::Id)
.stream(db)
.stream(&db)
.await?;
while let Some(n) = notifications.try_next().await? {
notification_tasks.push(self.copy_notification(
n,
db,
&notification_prepared,
&notification_pb,
));
let (s, d, ps, p) = (
self.clone(),
db.clone(),
notification_prepared.clone(),
notification_pb.clone(),
);
let handler = tokio::spawn(async move {
let _ = s.copy_notification(n, d, ps).await;
p.inc(1);
});
tasks.push(handler);
}
future::try_join_all(note_tasks).await?;
future::try_join_all(reaction_tasks).await?;
future::try_join_all(vote_tasks).await?;
future::try_join_all(notification_tasks).await?;
future::join_all(tasks).await;
Ok(())
}
@ -222,25 +254,24 @@ impl Initializer {
async fn copy_note(
&self,
note: note::Model,
db: &DatabaseConnection,
insert_note: &PreparedStatement,
insert_home: &PreparedStatement,
pb: &ProgressBar,
db: DatabaseConnection,
insert_note: PreparedStatement,
insert_home: PreparedStatement,
) -> Result<(), Error> {
let reply = match &note.reply_id {
None => None,
Some(id) => note::Entity::find_by_id(id).one(db).await?,
Some(id) => note::Entity::find_by_id(id).one(&db).await?,
};
let renote = match &note.renote_id {
None => None,
Some(id) => note::Entity::find_by_id(id).one(db).await?,
Some(id) => note::Entity::find_by_id(id).one(&db).await?,
};
let files = get_attached_files(Some(note.to_owned()), db).await?;
let reply_files = get_attached_files(reply.to_owned(), db).await?;
let renote_files = get_attached_files(renote.to_owned(), db).await?;
let files = get_attached_files(Some(note.to_owned()), &db).await?;
let reply_files = get_attached_files(reply.to_owned(), &db).await?;
let renote_files = get_attached_files(renote.to_owned(), &db).await?;
let note_poll = note.find_related(poll::Entity).one(db).await?;
let note_poll = note.find_related(poll::Entity).one(&db).await?;
let poll_type = match note_poll {
None => None,
Some(v) => Some(PollType {
@ -262,13 +293,13 @@ impl Initializer {
.map(|(k, v)| (k.to_string(), v.as_i64().unwrap_or_default() as i32)),
),
};
let edits = note.find_related(note_edit::Entity).all(db).await?;
let edits = note.find_related(note_edit::Entity).all(&db).await?;
let edits = edits.iter().map(|v| async {
let v = v.to_owned();
Ok(NoteEditHistoryType {
content: v.text,
cw: v.cw,
files: get_files(v.file_ids, db).await?,
files: get_files(v.file_ids, &db).await?,
updated_at: v.updated_at.into(),
})
});
@ -323,7 +354,7 @@ impl Initializer {
};
self.scylla
.execute(insert_note, scylla_note.to_owned())
.execute(&insert_note, scylla_note.to_owned())
.await?;
let mut home_tasks = Vec::new();
@ -333,7 +364,7 @@ impl Initializer {
.filter(following::Column::FolloweeId.eq(note.user_id))
.filter(following::Column::FollowerHost.is_null())
.into_tuple::<String>()
.stream(db)
.stream(&db)
.await?;
while let Some(follower_id) = local_followers.try_next().await? {
@ -381,19 +412,17 @@ impl Initializer {
note_edit: s_note.note_edit,
updated_at: s_note.updated_at,
};
home_tasks.push(self.scylla.execute(insert_home, home));
home_tasks.push(self.scylla.execute(&insert_home, home));
}
future::try_join_all(home_tasks).await?;
pb.inc(1);
Ok(())
}
async fn copy_reaction(
&self,
reaction: note_reaction::Model,
insert: &PreparedStatement,
pb: &ProgressBar,
insert: PreparedStatement,
) -> Result<(), Error> {
let scylla_reaction = ReactionTable {
id: reaction.id,
@ -402,21 +431,19 @@ impl Initializer {
reaction: reaction.reaction,
created_at: reaction.created_at.into(),
};
self.scylla.execute(insert, scylla_reaction).await?;
self.scylla.execute(&insert, scylla_reaction).await?;
pb.inc(1);
Ok(())
}
async fn copy_vote(
&self,
vote: poll_vote::Model,
db: &DatabaseConnection,
select: &PreparedStatement,
insert: &PreparedStatement,
pb: &ProgressBar,
db: DatabaseConnection,
select: PreparedStatement,
insert: PreparedStatement,
) -> Result<(), Error> {
let voted_user = user::Entity::find_by_id(&vote.user_id).one(db).await?;
let voted_user = user::Entity::find_by_id(&vote.user_id).one(&db).await?;
if let Some(u) = voted_user {
let mut scylla_vote = PollVoteTable {
note_id: vote.note_id.to_owned(),
@ -427,7 +454,7 @@ impl Initializer {
};
if let Ok(row) = self
.scylla
.execute(select, (vote.note_id, vote.user_id))
.execute(&select, (vote.note_id, vote.user_id))
.await?
.first_row()
{
@ -435,23 +462,21 @@ impl Initializer {
scylla_vote.choice.append(&mut s_vote.choice);
scylla_vote.choice.dedup();
}
self.scylla.execute(insert, scylla_vote).await?;
self.scylla.execute(&insert, scylla_vote).await?;
}
pb.inc(1);
Ok(())
}
async fn copy_notification(
&self,
model: notification::Model,
db: &DatabaseConnection,
insert: &PreparedStatement,
pb: &ProgressBar,
db: DatabaseConnection,
insert: PreparedStatement,
) -> Result<(), Error> {
let notifier = match model.notifier_id.to_owned() {
None => None,
Some(id) => user::Entity::find_by_id(&id).one(db).await?,
Some(id) => user::Entity::find_by_id(&id).one(&db).await?,
};
let s_notification = NotificationTable {
target_id: model.notifiee_id,
@ -472,9 +497,8 @@ impl Initializer {
custom_header: model.custom_header,
custom_icon: model.custom_icon,
};
self.scylla.execute(insert, s_notification).await?;
self.scylla.execute(&insert, s_notification).await?;
pb.inc(1);
Ok(())
}
}