summary refs log tree commit diff
path: root/src/jobs/startup.rs
blob: 6ca184993f5f1918bf0d926e44e998141b17aabb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use tokio::runtime;
use std::{fs, path::Path, thread};
use tokio::sync::mpsc::channel;

use sqlx::{migrate::Migrator, Connection, PgConnection, Row};

type JobResult = Result<(), ()>;

pub async fn run_startup_jobs() {
    let (tx, mut rx) = channel::<JobResult>(1);

    thread::spawn(move || {
        let runtime = runtime::Builder::new_current_thread()
            .enable_all()
            .build().unwrap();

        runtime.block_on(async {
            migrate_db().await;

            tx.send(Ok(())).await.unwrap()
        });
    });

    rx.recv().await;
}

async fn migrate_db() {
    println!(" - Migrating database");

    let migrator = Migrator::new(Path::new("./src/db/migrations/"))
        .await.expect("Could not initialize migrations");

    let mut database_connection = PgConnection::connect("postgres://localhost/photoxide").await.expect("Could not connect to DB");
    let mut transaction = database_connection.begin().await.expect("Could not start transaction");

    let procs : Vec<String> = sqlx::query("
        SELECT proname || '(' || oidvectortypes(proargtypes) || ')' as func_name
            FROM pg_proc INNER JOIN pg_namespace ns ON (pg_proc.pronamespace = ns.oid)
            WHERE ns.nspname = 'phtx'
        order by proname;
    ").fetch_all(&mut *transaction)
        .await.unwrap()
        .into_iter().map(|row| {
            row.try_get_unchecked("func_name").unwrap()
        }).collect();

    let views : Vec<String> = sqlx::query("select table_name from information_schema.views where table_schema = 'phtx'")
        .fetch_all(&mut *transaction)
        .await.unwrap()
        .into_iter().map(|row| {
            row.try_get_unchecked("table_name").unwrap()
        }).collect();

    println!("  - Clearing stored procedures");
    for proc in procs {
        sqlx::query(format!("drop function phtx.{proc}").as_str())
            .execute(&mut *transaction)
            .await
            .unwrap();
    }

    println!("  - Clearing views");
    for view in views {
        sqlx::query(format!("drop view phtx.{view}").as_str())
            .execute(&mut *transaction)
            .await
            .unwrap();
    }

    let num_migrations = migrator.iter().len();
    let suffix = if num_migrations == 1 {""} else {"s"};
    println!("  - Running {num_migrations} table migration{suffix}");
    migrator.run(&mut transaction).await.unwrap();
    
    let view_scripts : Vec<(String, String)> = fs::read_dir(Path::new("./src/db/views/")).unwrap()
        .into_iter().map(|dir_entry| {
            (dir_entry.as_ref().unwrap().file_name().to_str().unwrap().to_string(), fs::read_to_string(dir_entry.unwrap().path()).unwrap())
        }).collect();

    let proc_scripts : Vec<(String, String)> = fs::read_dir(Path::new("./src/db/functions/")).unwrap()
        .into_iter().map(|dir_entry| {
            (dir_entry.as_ref().unwrap().file_name().to_str().unwrap().to_string(), fs::read_to_string(dir_entry.unwrap().path()).unwrap())
        }).collect();

    println!("  - Re-initializing views");
    for (path, script) in view_scripts {
        println!("   - Running ./src/db/views/{path}");
        sqlx::raw_sql(script.as_str())
            .execute(&mut *transaction)
            .await.unwrap();
    }

    println!("  - Re-initializing stored procedures");
    for (path, script) in proc_scripts {
        println!("   - Running ./src/db/functions/{path}");
        sqlx::raw_sql(script.as_str())
            .execute(&mut *transaction)
            .await.unwrap();
    }

    println!("  - Committing transaction");
    transaction.commit().await.expect("Could not commit migrate transaction");
}