feat: migration command from postgres to scylla

This commit is contained in:
Namekuji 2023-09-09 07:18:07 -04:00
parent 641b14b9ea
commit 0e087e8f82
No known key found for this signature in database
GPG key ID: 1D62332C07FBA532
8 changed files with 161 additions and 104 deletions

View file

@ -16,7 +16,7 @@
"init": "pnpm run migrate",
"migrate": "pnpm --filter backend run migrate",
"scylla:migrate": "pnpm --filter native-utils run scylla:migrate",
"scylla:setup": "pnpm run scylla:migrate && pnpm --filter native-utils run scylla:setup",
"scylla:setup": "pnpm --filter native-utils run scylla:setup",
"revertmigration": "pnpm --filter backend run revertmigration",
"migrateandstart": "pnpm run migrate && pnpm run start",
"gulp": "gulp build",

View file

@ -38,8 +38,8 @@
"build:napi": "napi build --features napi --platform --release ./built/",
"build:migration": "cargo build --locked --release --manifest-path ./migration/Cargo.toml && cp -v ./target/release/migration ./built/migration",
"build:debug": "napi build --features napi --platform ./built/ && cargo build --locked --manifest-path ./migration/Cargo.toml && cp -v ./target/debug/migration ./built/migration",
"scylla:migrate": "cargo run --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml up",
"scylla:setup": "cargo run --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml setup",
"scylla:migrate": "cargo run --release --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml up",
"scylla:setup": "cargo run --release --locked --manifest-path ./scylla-migration/Cargo.toml -- -d ./scylla-migration/cql -c ../../../.config/default.yml setup",
"prepublishOnly": "napi prepublish -t npm",
"test": "pnpm run cargo:test && pnpm run build:napi && ava",
"universal": "napi universal",

View file

@ -41,9 +41,9 @@ pub async fn run_cli() -> Result<(), Error> {
.down(num)
.await?
}
MigrationCommand::Setup => {
MigrationCommand::Setup { multi_thread } => {
let initializer = Initializer::new(&scylla_conf, &config.db).await?;
initializer.setup().await?;
initializer.setup(multi_thread).await?;
}
_ => {}
};
@ -115,5 +115,15 @@ pub(crate) enum MigrationCommand {
num: u32,
},
#[clap(about = "Set up PostgreSQL and ScyllaDB", display_order = 40)]
Setup,
Setup {
#[clap(
value_parser,
short,
long,
default_value = "false",
help = "Enable multi-thread mode (WARNING: High memory consumption)",
display_order = 41
)]
multi_thread: bool,
},
}

View file

@ -6,5 +6,5 @@ pub mod note_reaction;
pub mod notification;
pub mod poll;
pub mod poll_vote;
pub mod user;
pub mod sea_orm_active_enums;
pub mod user;

View file

@ -93,5 +93,4 @@ impl Related<super::poll_vote::Entity> for Entity {
}
}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -11,13 +11,13 @@ use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("ScyllaDB Session error: {0}")]
#[error("ScyllaDB session error: {0}")]
Session(#[from] NewSessionError),
#[error("Query error: {0}")]
#[error("ScyllaDB query error: {0}")]
Query(#[from] QueryError),
#[error("Conversion error: {0}")]
#[error("ScyllaDB conversion error: {0}")]
Conversion(#[from] FromRowError),
#[error("Row error: {0}")]
#[error("ScyllaDB row error: {0}")]
Row(#[from] SingleRowTypedError),
#[error("File error: {0}")]
File(#[from] io::Error),

View file

@ -2,6 +2,12 @@ use scylla_migration::{cli::run_cli, error::Error};
#[tokio::main]
async fn main() -> Result<(), Error> {
let default_panic = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
default_panic(info);
std::process::exit(1);
}));
run_cli().await?;
Ok(())

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use chrono::{DateTime, NaiveDate, Utc};
use dialoguer::{theme::ColorfulTheme, Confirm};
use futures::{future, TryFutureExt, TryStreamExt};
use futures::{future, TryStreamExt};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use scylla::{
prepared_statement::PreparedStatement, FromRow, FromUserType, IntoUserType, Session,
@ -52,7 +52,7 @@ impl Initializer {
})
}
pub(crate) async fn setup(&self) -> Result<(), Error> {
pub(crate) async fn setup(&self, mt: bool) -> Result<(), Error> {
println!("Several tables in PostgreSQL are going to moved to ScyllaDB.");
let pool = Database::connect(&self.postgres_url).await?;
@ -77,9 +77,7 @@ impl Initializer {
}
println!("Copyping data from PostgreSQL to ScyllaDB.");
self.copy(pool.clone()).await?;
return Ok(());
self.copy(pool.clone(), mt).await?;
println!("Dropping constraints from PostgreSQL.");
let fk_pairs = vec![
@ -121,13 +119,13 @@ impl Initializer {
Ok(())
}
async fn copy(&self, db: DatabaseConnection) -> Result<(), Error> {
let note_prepared = self.scylla.prepare(INSERT_NOTE).await?;
let home_prepared = self.scylla.prepare(INSERT_HOME_TIMELINE).await?;
let reaction_prepared = self.scylla.prepare(INSERT_REACTION).await?;
let vote_insert_prepared = self.scylla.prepare(INSERT_POLL_VOTE).await?;
let vote_select_prepared = self.scylla.prepare(SELECT_POLL_VOTE).await?;
let notification_prepared = self.scylla.prepare(INSERT_NOTIFICATION).await?;
async fn copy(&self, db: DatabaseConnection, multi_thread: bool) -> Result<(), Error> {
let note_prepared = Arc::new(self.scylla.prepare(INSERT_NOTE).await?);
let home_prepared = Arc::new(self.scylla.prepare(INSERT_HOME_TIMELINE).await?);
let reaction_prepared = Arc::new(self.scylla.prepare(INSERT_REACTION).await?);
let vote_insert_prepared = Arc::new(self.scylla.prepare(INSERT_POLL_VOTE).await?);
let vote_select_prepared = Arc::new(self.scylla.prepare(SELECT_POLL_VOTE).await?);
let notification_prepared = Arc::new(self.scylla.prepare(INSERT_NOTIFICATION).await?);
let num_notes: i64 = note::Entity::find()
.select_only()
@ -168,12 +166,26 @@ impl Initializer {
.unwrap()
.progress_chars("#>-");
let mp = MultiProgress::new();
let note_pb = mp.add(ProgressBar::new(num_notes as u64).with_style(pb_style.to_owned()).with_prefix("Posts"));
let reaction_pb =
mp.add(ProgressBar::new(num_reactions as u64).with_style(pb_style.to_owned()).with_prefix("Reactions"));
let vote_pb = mp.add(ProgressBar::new(num_votes as u64).with_style(pb_style.to_owned()).with_prefix("Votes"));
let notification_pb =
mp.add(ProgressBar::new(num_notifications as u64).with_style(pb_style.to_owned()).with_prefix("Notifications"));
let note_pb = mp.add(
ProgressBar::new(num_notes as u64)
.with_style(pb_style.to_owned())
.with_prefix("Posts"),
);
let reaction_pb = mp.add(
ProgressBar::new(num_reactions as u64)
.with_style(pb_style.to_owned())
.with_prefix("Reactions"),
);
let vote_pb = mp.add(
ProgressBar::new(num_votes as u64)
.with_style(pb_style.to_owned())
.with_prefix("Votes"),
);
let notification_pb = mp.add(
ProgressBar::new(num_notifications as u64)
.with_style(pb_style.to_owned())
.with_prefix("Notifications"),
);
let mut tasks = Vec::new();
@ -189,11 +201,16 @@ impl Initializer {
home_prepared.clone(),
note_pb.clone(),
);
let handler = tokio::spawn(async move {
let _ = s.copy_note(note, d, h, n).await;
let f = async move {
s.copy_note(note, d, n, h).await.expect("Note copy failed");
p.inc(1);
});
};
if multi_thread {
let handler = tokio::spawn(f);
tasks.push(handler);
} else {
(|| f)().await;
}
}
let mut reactions = note_reaction::Entity::find()
@ -202,11 +219,18 @@ impl Initializer {
.await?;
while let Some(reaction) = reactions.try_next().await? {
let (s, r, p) = (self.clone(), reaction_prepared.clone(), reaction_pb.clone());
let handler = tokio::spawn(async move {
let _ = s.copy_reaction(reaction, r).await;
let f = async move {
s.copy_reaction(reaction, r)
.await
.expect("Reaction copy failed");
p.inc(1);
});
};
if multi_thread {
let handler = tokio::spawn(f);
tasks.push(handler);
} else {
(|| f)().await;
}
}
let mut votes = poll_vote::Entity::find()
@ -221,11 +245,18 @@ impl Initializer {
vote_insert_prepared.clone(),
vote_pb.clone(),
);
let handler = tokio::spawn(async move {
let _ = s.copy_vote(vote, d, sp, ip).await;
let f = async move {
s.copy_vote(vote, d, sp, ip)
.await
.expect("Vote copy failed");
p.inc(1);
});
};
if multi_thread {
let handler = tokio::spawn(f);
tasks.push(handler);
} else {
(|| f)().await;
}
}
let mut notifications = notification::Entity::find()
@ -239,11 +270,18 @@ impl Initializer {
notification_prepared.clone(),
notification_pb.clone(),
);
let handler = tokio::spawn(async move {
let _ = s.copy_notification(n, d, ps).await;
let f = async move {
s.copy_notification(n, d, ps)
.await
.expect("Notification copy failed");
p.inc(1);
});
};
if multi_thread {
let handler = tokio::spawn(f);
tasks.push(handler);
} else {
(|| f)().await;
}
}
future::join_all(tasks).await;
@ -255,8 +293,8 @@ impl Initializer {
&self,
note: note::Model,
db: DatabaseConnection,
insert_note: PreparedStatement,
insert_home: PreparedStatement,
insert_note: Arc<PreparedStatement>,
insert_home: Arc<PreparedStatement>,
) -> Result<(), Error> {
let reply = match &note.reply_id {
None => None,
@ -313,7 +351,7 @@ impl Initializer {
let scylla_note = NoteTable {
created_at_date: note.created_at.date_naive(),
created_at: note.created_at.into(),
id: note.id.to_owned(),
id: note.id.clone(),
visibility: note.visibility.to_value(),
content: note.text,
name: note.name,
@ -334,8 +372,8 @@ impl Initializer {
poll: poll_type,
thread_id: note.thread_id,
channel_id: note.channel_id,
user_id: note.user_id.to_owned(),
user_host: note.user_host,
user_id: note.user_id.clone(),
user_host: note.user_host.unwrap_or("local".to_string()),
reply_id: note.reply_id,
reply_user_id: note.reply_user_id,
reply_user_host: note.reply_user_host,
@ -358,19 +396,9 @@ impl Initializer {
.await?;
let mut home_tasks = Vec::new();
let mut local_followers = following::Entity::find()
.select_only()
.column(following::Column::FollowerId)
.filter(following::Column::FolloweeId.eq(note.user_id))
.filter(following::Column::FollowerHost.is_null())
.into_tuple::<String>()
.stream(&db)
.await?;
while let Some(follower_id) = local_followers.try_next().await? {
let s_note = scylla_note.to_owned();
let home = HomeTimelineTable {
feed_user_id: follower_id,
let s_note = scylla_note.clone();
let mut home = HomeTimelineTable {
feed_user_id: s_note.user_id.clone(),
created_at_date: s_note.created_at_date,
created_at: s_note.created_at,
id: s_note.id,
@ -395,7 +423,7 @@ impl Initializer {
thread_id: s_note.thread_id,
channel_id: s_note.channel_id,
user_id: s_note.user_id,
user_host: s_note.user_host,
user_host: s_note.user_host.clone(),
reply_id: s_note.reply_id,
reply_user_id: s_note.reply_user_id,
reply_user_host: s_note.reply_user_host,
@ -412,7 +440,21 @@ impl Initializer {
note_edit: s_note.note_edit,
updated_at: s_note.updated_at,
};
home_tasks.push(self.scylla.execute(&insert_home, home));
if s_note.user_host == "local" {
home_tasks.push(self.scylla.execute(&insert_home, home.clone()));
}
let mut local_followers = following::Entity::find()
.select_only()
.column(following::Column::FollowerId)
.filter(following::Column::FolloweeId.eq(note.user_id))
.filter(following::Column::FollowerHost.is_null())
.into_tuple::<String>()
.stream(&db)
.await?;
while let Some(follower_id) = local_followers.try_next().await? {
home.feed_user_id = follower_id;
home_tasks.push(self.scylla.execute(&insert_home, home.clone()));
}
future::try_join_all(home_tasks).await?;
@ -422,7 +464,7 @@ impl Initializer {
async fn copy_reaction(
&self,
reaction: note_reaction::Model,
insert: PreparedStatement,
insert: Arc<PreparedStatement>,
) -> Result<(), Error> {
let scylla_reaction = ReactionTable {
id: reaction.id,
@ -440,8 +482,8 @@ impl Initializer {
&self,
vote: poll_vote::Model,
db: DatabaseConnection,
select: PreparedStatement,
insert: PreparedStatement,
select: Arc<PreparedStatement>,
insert: Arc<PreparedStatement>,
) -> Result<(), Error> {
let voted_user = user::Entity::find_by_id(&vote.user_id).one(&db).await?;
if let Some(u) = voted_user {
@ -472,7 +514,7 @@ impl Initializer {
&self,
model: notification::Model,
db: DatabaseConnection,
insert: PreparedStatement,
insert: Arc<PreparedStatement>,
) -> Result<(), Error> {
let notifier = match model.notifier_id.to_owned() {
None => None,
@ -620,7 +662,7 @@ struct NoteTable {
thread_id: Option<String>,
channel_id: Option<String>,
user_id: String,
user_host: Option<String>,
user_host: String,
reply_id: Option<String>,
reply_user_id: Option<String>,
reply_user_host: Option<String>,
@ -683,7 +725,7 @@ INSERT INTO note (
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#;
#[derive(ValueList)]
#[derive(Clone, ValueList)]
struct HomeTimelineTable {
feed_user_id: String,
created_at_date: NaiveDate,
@ -710,7 +752,7 @@ struct HomeTimelineTable {
thread_id: Option<String>,
channel_id: Option<String>,
user_id: String,
user_host: Option<String>,
user_host: String,
reply_id: Option<String>,
reply_user_id: Option<String>,
reply_user_host: Option<String>,