1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
use tokio::runtime;
use std::{fs, path::Path, thread};
use tokio::sync::mpsc::channel;
use sqlx::{migrate::Migrator, Connection, PgConnection, Row};
type JobResult = Result<(), ()>;
pub async fn run_startup_jobs() {
let (tx, mut rx) = channel::<JobResult>(1);
thread::spawn(move || {
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build().unwrap();
runtime.block_on(async {
migrate_db().await;
tx.send(Ok(())).await.unwrap()
});
});
rx.recv().await;
}
async fn migrate_db() {
println!(" - Migrating database");
let migrator = Migrator::new(Path::new("./src/db/migrations/"))
.await.expect("Could not initialize migrations");
let mut database_connection = PgConnection::connect("postgres://localhost/photoxide").await.expect("Could not connect to DB");
let mut transaction = database_connection.begin().await.expect("Could not start transaction");
let procs : Vec<String> = sqlx::query("
SELECT proname || '(' || oidvectortypes(proargtypes) || ')' as func_name
FROM pg_proc INNER JOIN pg_namespace ns ON (pg_proc.pronamespace = ns.oid)
WHERE ns.nspname = 'phtx'
order by proname;
").fetch_all(&mut *transaction)
.await.unwrap()
.into_iter().map(|row| {
row.try_get_unchecked("func_name").unwrap()
}).collect();
let views : Vec<String> = sqlx::query("select table_name from information_schema.views where table_schema = 'phtx'")
.fetch_all(&mut *transaction)
.await.unwrap()
.into_iter().map(|row| {
row.try_get_unchecked("table_name").unwrap()
}).collect();
println!(" - Clearing stored procedures");
for proc in procs {
sqlx::query(format!("drop function phtx.{proc}").as_str())
.execute(&mut *transaction)
.await
.unwrap();
}
println!(" - Clearing views");
for view in views {
sqlx::query(format!("drop view phtx.{view}").as_str())
.execute(&mut *transaction)
.await
.unwrap();
}
let num_migrations = migrator.iter().len();
let suffix = if num_migrations == 1 {""} else {"s"};
println!(" - Running {num_migrations} table migration{suffix}");
migrator.run(&mut transaction).await.unwrap();
let view_scripts : Vec<(String, String)> = fs::read_dir(Path::new("./src/db/views/")).unwrap()
.into_iter().map(|dir_entry| {
(dir_entry.as_ref().unwrap().file_name().to_str().unwrap().to_string(), fs::read_to_string(dir_entry.unwrap().path()).unwrap())
}).collect();
let proc_scripts : Vec<(String, String)> = fs::read_dir(Path::new("./src/db/functions/")).unwrap()
.into_iter().map(|dir_entry| {
(dir_entry.as_ref().unwrap().file_name().to_str().unwrap().to_string(), fs::read_to_string(dir_entry.unwrap().path()).unwrap())
}).collect();
println!(" - Re-initializing views");
for (path, script) in view_scripts {
println!(" - Running ./src/db/views/{path}");
sqlx::raw_sql(script.as_str())
.execute(&mut *transaction)
.await.unwrap();
}
println!(" - Re-initializing stored procedures");
for (path, script) in proc_scripts {
println!(" - Running ./src/db/functions/{path}");
sqlx::raw_sql(script.as_str())
.execute(&mut *transaction)
.await.unwrap();
}
println!(" - Committing transaction");
transaction.commit().await.expect("Could not commit migrate transaction");
}
|