use tokio::runtime; use std::{fs, path::Path, thread}; use tokio::sync::mpsc::channel; use sqlx::{migrate::Migrator, Connection, PgConnection, Row}; type JobResult = Result<(), ()>; pub async fn run_startup_jobs() { let (tx, mut rx) = channel::(1); thread::spawn(move || { let runtime = runtime::Builder::new_current_thread() .enable_all() .build().unwrap(); runtime.block_on(async { migrate_db().await; tx.send(Ok(())).await.unwrap() }); }); rx.recv().await; } async fn migrate_db() { println!(" - Migrating database"); let migrator = Migrator::new(Path::new("./src/db/migrations/")) .await.expect("Could not initialize migrations"); let mut database_connection = PgConnection::connect("postgres://localhost/photoxide").await.expect("Could not connect to DB"); let mut transaction = database_connection.begin().await.expect("Could not start transaction"); let procs : Vec = sqlx::query(" SELECT proname || '(' || oidvectortypes(proargtypes) || ')' as func_name FROM pg_proc INNER JOIN pg_namespace ns ON (pg_proc.pronamespace = ns.oid) WHERE ns.nspname = 'phtx' order by proname; ").fetch_all(&mut *transaction) .await.unwrap() .into_iter().map(|row| { row.try_get_unchecked("func_name").unwrap() }).collect(); let views : Vec = sqlx::query("select table_name from information_schema.views where table_schema = 'phtx'") .fetch_all(&mut *transaction) .await.unwrap() .into_iter().map(|row| { row.try_get_unchecked("table_name").unwrap() }).collect(); println!(" - Clearing stored procedures"); for proc in procs { sqlx::query(format!("drop function phtx.{proc}").as_str()) .execute(&mut *transaction) .await .unwrap(); } println!(" - Clearing views"); for view in views { sqlx::query(format!("drop view phtx.{view}").as_str()) .execute(&mut *transaction) .await .unwrap(); } let num_migrations = migrator.iter().len(); let suffix = if num_migrations == 1 {""} else {"s"}; println!(" - Running {num_migrations} table migration{suffix}"); migrator.run(&mut transaction).await.unwrap(); let view_scripts : Vec<(String, String)> = fs::read_dir(Path::new("./src/db/views/")).unwrap() .into_iter().map(|dir_entry| { (dir_entry.as_ref().unwrap().file_name().to_str().unwrap().to_string(), fs::read_to_string(dir_entry.unwrap().path()).unwrap()) }).collect(); let proc_scripts : Vec<(String, String)> = fs::read_dir(Path::new("./src/db/functions/")).unwrap() .into_iter().map(|dir_entry| { (dir_entry.as_ref().unwrap().file_name().to_str().unwrap().to_string(), fs::read_to_string(dir_entry.unwrap().path()).unwrap()) }).collect(); println!(" - Re-initializing views"); for (path, script) in view_scripts { println!(" - Running ./src/db/views/{path}"); sqlx::raw_sql(script.as_str()) .execute(&mut *transaction) .await.unwrap(); } println!(" - Re-initializing stored procedures"); for (path, script) in proc_scripts { println!(" - Running ./src/db/functions/{path}"); sqlx::raw_sql(script.as_str()) .execute(&mut *transaction) .await.unwrap(); } println!(" - Committing transaction"); transaction.commit().await.expect("Could not commit migrate transaction"); }