summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs101
-rw-r--r--src/system/types.rs7
2 files changed, 69 insertions, 39 deletions
diff --git a/src/main.rs b/src/main.rs
index 4ed4fed..2fe1cf0 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,64 +2,87 @@
 
 mod config;
 mod system;
-use system::Manager;
+use system::{Manager, SystemThreadCommand};
 
-use std::{fs, panic};
-use tokio::{runtime, task::JoinSet};
+use std::{fs, thread::{self, sleep, JoinHandle}, time::Duration};
+use tokio::runtime;
 
 fn main() {
     let initial_config = fs::read_to_string("./config.toml").expect("Could not find config file");
     let config = config::Config::load(initial_config.to_string());
 
-    let waiter_pool = runtime::Builder::new_multi_thread()
-        .worker_threads(config.systems.len()).build().unwrap();
-    let mut waiters = JoinSet::new();
+    let mut join_handles = Vec::<(String, JoinHandle<_>)>::new();
 
-    for (system_name, system_config) in config.systems.into_iter() {
-        spawn_system(&mut waiters, &waiter_pool, &system_name, system_config);
+    for (system_name, system_config) in config.systems.iter() {
+        let handle = spawn_system(system_name, system_config.clone());
+
+        join_handles.push((system_name.clone(), handle));
     }
 
-    runtime::Builder::new_current_thread().build().unwrap().block_on(async {
-        while let Some(system_join) = waiters.join_next().await {
-            if let Ok(system_name) = system_join {
-                println!("Thread joined for system: {}. Updating config and restarting.", system_name);
-
-                let config_file = if let Ok(config_file) = fs::read_to_string("./config.toml") {
-                    config_file
-                } else {
-                    println!("Could not open config file, continuing with initial config");
-                    initial_config.clone()
-                };
-
-                let updated_config = config::Config::load(config_file);
-
-                if let Some((_, system_config)) = updated_config.systems.into_iter().find(|(name, _)| name.eq(&system_name)) {
-                    spawn_system(&mut waiters, &waiter_pool, &system_name, system_config.clone());
-                } else {
-                    println!("New config file but this system no longer exists, exiting.");
-                }
-            } else {
-                println!("Thread panicked");
+    loop {
+        if let Some(completed_index) = join_handles.iter().position(|(_, handle)| handle.is_finished()) {
+            let (name, next_join) = join_handles.swap_remove(completed_index);
+
+            match next_join.join() {
+                Err(err) => {
+                    println!("Thread for system {} panicked!", name);
+                    println!("{:?}", err);
+                },
+
+                Ok(SystemThreadCommand::Restart) => {
+                    println!("Thread for system {} requested restart", name);
+                    if let Some((_, config)) = config.systems.iter().find(|(system_name, _)| name == **system_name) {
+                        let handle = spawn_system(&name, config.clone());
+                        join_handles.push((name, handle));
+                    }
+                },
+
+                Ok(SystemThreadCommand::ShutdownSystem) => {
+                    println!("Thread for system {} requested shutdown", name);
+                    continue;
+                },
+                Ok(SystemThreadCommand::ReloadConfig) => {
+                    println!("Thread for system {} requested config reload", name);
+                    let config_file = if let Ok(config_file) = fs::read_to_string("./config.toml") {
+                        config_file
+                    } else {
+                        println!("Could not open config file, continuing with initial config");
+                        initial_config.clone()
+                    };
+
+                    let updated_config = config::Config::load(config_file);
+
+                    if let Some((_, system_config)) = updated_config.systems.into_iter().find(|(system_name, _)| *name == *system_name) {
+                        let handle = spawn_system(&name, system_config);
+                        join_handles.push((name.clone(), handle));
+                    } else {
+                        println!("New config file but this system no longer exists, exiting.");
+                        continue;
+                    }
+                },
+                Ok(SystemThreadCommand::ShutdownAll) => break,
             }
         }
-    })
+
+        sleep(Duration::from_secs(5));
+    }
 }
 
-fn spawn_system(joinset: &mut JoinSet<String>, pool: &tokio::runtime::Runtime, system_name : &String, system_config: config::System) {
+fn spawn_system(system_name : &String, system_config: config::System) -> JoinHandle<SystemThreadCommand> {
     let name = system_name.clone();
     let config = system_config.clone();
-    joinset.spawn_blocking_on(move || -> String {
+
+    thread::spawn(move || -> _ {
         let thread_local_runtime = runtime::Builder::new_current_thread().enable_all().build().unwrap();
 
-        let _ = panic::catch_unwind(|| {
-            thread_local_runtime.block_on(async {
-                let mut system = Manager::new(name.clone(), config);
-                system.start_clients().await;
-            });
+        // TODO: allow system manager runtime to return a command
+        thread_local_runtime.block_on(async {
+            let mut system = Manager::new(name, config);
+            system.start_clients().await;
         });
 
-        name
-    }, pool.handle());
+        SystemThreadCommand::Restart
+    })
 }
 
 
diff --git a/src/system/types.rs b/src/system/types.rs
index bef6d6d..9a410aa 100644
--- a/src/system/types.rs
+++ b/src/system/types.rs
@@ -41,3 +41,10 @@ pub enum SystemEvent {
     // Autoproxy
     AutoproxyTimeout(Timestamp),
 }
+
+pub enum SystemThreadCommand {
+    Restart,
+    ReloadConfig,
+    ShutdownSystem,
+    ShutdownAll,
+}