summary refs log tree commit diff
diff options
context:
space:
mode:
authorAshelyn Rose <git@ashen.earth>2024-10-01 00:00:12 -0600
committerAshelyn Rose <git@ashen.earth>2024-10-01 00:00:12 -0600
commitf8d2b25d1a4a5ef33b5343e5bff0a48a4a41a702 (patch)
treeca70e9b85699dfaaef85848188b9abfce0222dd4
parent4fcb9965c09782e7da2c8a316e1980ec192d181f (diff)
Allow system threads to crash and restart
-rw-r--r--Cargo.lock17
-rw-r--r--Cargo.toml2
-rw-r--r--src/main.rs75
3 files changed, 74 insertions, 20 deletions
diff --git a/Cargo.lock b/Cargo.lock
index f373c0f..f1f5c77 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -476,6 +476,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "hermit-abi"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
+
+[[package]]
 name = "http"
 version = "0.2.11"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -792,6 +798,16 @@ dependencies = [
 ]
 
 [[package]]
+name = "num_cpus"
+version = "1.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
+dependencies = [
+ "hermit-abi",
+ "libc",
+]
+
+[[package]]
 name = "object"
 version = "0.32.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1471,6 +1487,7 @@ dependencies = [
  "bytes",
  "libc",
  "mio",
+ "num_cpus",
  "pin-project-lite",
  "socket2",
  "tokio-macros",
diff --git a/Cargo.toml b/Cargo.toml
index 1fcdeeb..c540b25 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,7 +11,7 @@ futures = "0.3.30"
 regex = "1.10.2"
 reqwest = "0.12"
 serde = { version = "1.0.196", features = [ "derive" ] }
-tokio = { version = "1.38.0", features = [ "rt", "macros", "time" ] }
+tokio = { version = "1.38.0", features = [ "rt", "macros", "time", "rt-multi-thread" ] }
 toml = "0.8.8"
 twilight-gateway = "0.15.4"
 twilight-http = "0.15.4"
diff --git a/src/main.rs b/src/main.rs
index e29f691..41f150b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,28 +2,65 @@ mod config;
 mod system;
 mod listener;
 
-use std::thread::{self, JoinHandle};
-use tokio::runtime;
-
-use system::System;
+use std::{fs, thread};
+use tokio::{runtime, task::JoinSet};
 
 fn main() {
-    let config_str = include_str!("../config.toml");
-    let config = config::Config::load(config_str.to_string());
-
-    let handles : Vec<_> = config.systems.into_iter().map(|(system_name, system_config)| -> JoinHandle<()>  {
-        thread::spawn(move || {
-            let runtime = runtime::Builder::new_current_thread().enable_all().build().expect("Could not construct Tokio runtime");
-            runtime.block_on(async {
-                let mut system = System::new(system_name, system_config);
-                system.start_clients().await;
-            })
-        })
-    }).collect();
-
-    for thread_handle in handles.into_iter() {
-        thread_handle.join().expect("Child thread has panicked");
+    let initial_config = fs::read_to_string("./config.toml").expect("Could not find config file");
+    let config = config::Config::load(initial_config.to_string());
+
+    let waiter_pool = runtime::Builder::new_multi_thread()
+        .worker_threads(config.systems.len()).build().unwrap();
+    let mut waiters = JoinSet::new();
+
+    for (system_name, system_config) in config.systems.into_iter() {
+        let system_handle = spawn_system(&system_name, system_config);
+
+        waiters.spawn_blocking_on(move || -> String {
+            system_handle.join();
+            system_name
+        }, waiter_pool.handle());
     }
+
+    runtime::Builder::new_current_thread().build().unwrap().block_on(async {
+        while let Some(system_join) = waiters.join_next().await {
+            if let Ok(system_name) = system_join {
+                println!("Thread joined for system: {}. Updating config and restarting.", system_name);
+
+                let config_file = if let Ok(config_file) = fs::read_to_string("./config.toml") {
+                    config_file
+                } else {
+                    println!("Could not open config file, continuing with initial config");
+                    initial_config.clone()
+                };
+
+                let updated_config = config::Config::load(config_file);
+
+                if let Some((_, system_config)) = updated_config.systems.into_iter().find(|(name, _)| name.eq(&system_name)) {
+                    let system_handle = spawn_system(&system_name, system_config.clone());
+                    
+                    waiters.spawn_blocking_on(move || -> String {
+                        system_handle.join();
+                        system_name
+                    }, waiter_pool.handle());
+                } else {
+                    println!("New config file but this system no longer exists, exiting.");
+                }
+            }
+        }
+    })
+}
+
+fn spawn_system(system_name : &String, system_config: config::System) -> thread::JoinHandle<()> {
+    let name = system_name.clone();
+    thread::spawn(move || {
+        let thread_local_runtime = runtime::Builder::new_current_thread().enable_all().build().unwrap();
+
+        thread_local_runtime.block_on(async {
+            let mut system = system::System::new(name, system_config);
+            system.start_clients().await;
+        });
+    })
 }