From 26a587e223e4c91601e79f7f3a69e8b2a0cbf0b2 Mon Sep 17 00:00:00 2001 From: Ashelyn Dawn Date: Mon, 7 Oct 2024 19:23:34 -0600 Subject: Manually check system threads every few seconds --- src/main.rs | 101 +++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 62 insertions(+), 39 deletions(-) (limited to 'src/main.rs') diff --git a/src/main.rs b/src/main.rs index 4ed4fed..2fe1cf0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,64 +2,87 @@ mod config; mod system; -use system::Manager; +use system::{Manager, SystemThreadCommand}; -use std::{fs, panic}; -use tokio::{runtime, task::JoinSet}; +use std::{fs, thread::{self, sleep, JoinHandle}, time::Duration}; +use tokio::runtime; fn main() { let initial_config = fs::read_to_string("./config.toml").expect("Could not find config file"); let config = config::Config::load(initial_config.to_string()); - let waiter_pool = runtime::Builder::new_multi_thread() - .worker_threads(config.systems.len()).build().unwrap(); - let mut waiters = JoinSet::new(); + let mut join_handles = Vec::<(String, JoinHandle<_>)>::new(); - for (system_name, system_config) in config.systems.into_iter() { - spawn_system(&mut waiters, &waiter_pool, &system_name, system_config); + for (system_name, system_config) in config.systems.iter() { + let handle = spawn_system(system_name, system_config.clone()); + + join_handles.push((system_name.clone(), handle)); } - runtime::Builder::new_current_thread().build().unwrap().block_on(async { - while let Some(system_join) = waiters.join_next().await { - if let Ok(system_name) = system_join { - println!("Thread joined for system: {}. Updating config and restarting.", system_name); - - let config_file = if let Ok(config_file) = fs::read_to_string("./config.toml") { - config_file - } else { - println!("Could not open config file, continuing with initial config"); - initial_config.clone() - }; - - let updated_config = config::Config::load(config_file); - - if let Some((_, system_config)) = updated_config.systems.into_iter().find(|(name, _)| name.eq(&system_name)) { - spawn_system(&mut waiters, &waiter_pool, &system_name, system_config.clone()); - } else { - println!("New config file but this system no longer exists, exiting."); - } - } else { - println!("Thread panicked"); + loop { + if let Some(completed_index) = join_handles.iter().position(|(_, handle)| handle.is_finished()) { + let (name, next_join) = join_handles.swap_remove(completed_index); + + match next_join.join() { + Err(err) => { + println!("Thread for system {} panicked!", name); + println!("{:?}", err); + }, + + Ok(SystemThreadCommand::Restart) => { + println!("Thread for system {} requested restart", name); + if let Some((_, config)) = config.systems.iter().find(|(system_name, _)| name == **system_name) { + let handle = spawn_system(&name, config.clone()); + join_handles.push((name, handle)); + } + }, + + Ok(SystemThreadCommand::ShutdownSystem) => { + println!("Thread for system {} requested shutdown", name); + continue; + }, + Ok(SystemThreadCommand::ReloadConfig) => { + println!("Thread for system {} requested config reload", name); + let config_file = if let Ok(config_file) = fs::read_to_string("./config.toml") { + config_file + } else { + println!("Could not open config file, continuing with initial config"); + initial_config.clone() + }; + + let updated_config = config::Config::load(config_file); + + if let Some((_, system_config)) = updated_config.systems.into_iter().find(|(system_name, _)| *name == *system_name) { + let handle = spawn_system(&name, system_config); + join_handles.push((name.clone(), handle)); + } else { + println!("New config file but this system no longer exists, exiting."); + continue; + } + }, + Ok(SystemThreadCommand::ShutdownAll) => break, } } - }) + + sleep(Duration::from_secs(5)); + } } -fn spawn_system(joinset: &mut JoinSet, pool: &tokio::runtime::Runtime, system_name : &String, system_config: config::System) { +fn spawn_system(system_name : &String, system_config: config::System) -> JoinHandle { let name = system_name.clone(); let config = system_config.clone(); - joinset.spawn_blocking_on(move || -> String { + + thread::spawn(move || -> _ { let thread_local_runtime = runtime::Builder::new_current_thread().enable_all().build().unwrap(); - let _ = panic::catch_unwind(|| { - thread_local_runtime.block_on(async { - let mut system = Manager::new(name.clone(), config); - system.start_clients().await; - }); + // TODO: allow system manager runtime to return a command + thread_local_runtime.block_on(async { + let mut system = Manager::new(name, config); + system.start_clients().await; }); - name - }, pool.handle()); + SystemThreadCommand::Restart + }) } -- cgit 1.4.1