From a69d779cfff810edd375318c3d36fecd42d294b5 Mon Sep 17 00:00:00 2001 From: Ashelyn Rose Date: Thu, 7 Nov 2024 20:07:25 -0700 Subject: Runtime ui --- src/config.rs | 1 + src/main.rs | 195 +++++++++++++++++++++++++++++++++++++++---- src/system/message_parser.rs | 14 +++- src/system/mod.rs | 84 +++++++++++++++---- 4 files changed, 258 insertions(+), 36 deletions(-) (limited to 'src') diff --git a/src/config.rs b/src/config.rs index 28fe422..efbb50d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -55,6 +55,7 @@ pub struct System { pub forward_pings: bool, pub autoproxy: Option, pub pluralkit: Option, + pub ui_color: Option, } fn default_forward_pings() -> bool { diff --git a/src/main.rs b/src/main.rs index 6e37938..287cd4f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,41 +2,141 @@ mod config; mod system; +use crossterm::{cursor::{self, MoveTo}, terminal::{Clear, ClearType, DisableLineWrap, EnableLineWrap, EnterAlternateScreen, LeaveAlternateScreen}}; use system::{Manager, SystemThreadCommand}; -use std::{fs, thread::{self, sleep, JoinHandle}, time::Duration, sync::mpsc}; +use std::{collections::{HashMap, VecDeque}, fs, io::{self, Write}, sync::mpsc, thread::{self, sleep, JoinHandle}, time::Duration}; use tokio::runtime; +pub struct UiState { + pub systems: HashMap, + pub logs: VecDeque, +} + +pub enum SystemState { + Running(HashMap), + Reloading, + Restarting, + Shutdown, +} + +pub struct MemberState { + pub connected: bool, + pub autoproxied: bool, +} + +pub enum SystemUiEvent { + SystemClose, + MemberAutoproxy(Option), + GatewayDisconnect(String), + GatewayConnect(String), + LogLine(String), +} + +const MAX_LOG : usize = 1000; + fn main() { let initial_config = fs::read_to_string("./config.toml").expect("Could not find config file"); let config = config::Config::load(initial_config.to_string()); - let (waker, waiter) = mpsc::channel::<()>(); + let (waker, waiter) = mpsc::channel::<(String, SystemUiEvent)>(); let mut join_handles = Vec::<(String, JoinHandle<_>)>::new(); + let mut ui_state = UiState { + systems: HashMap::new(), + logs: VecDeque::new(), + }; + for (system_name, system_config) in config.systems.iter() { let handle = spawn_system(system_name, system_config.clone(), waker.clone()); + let mut member_states = HashMap::new(); + for member_name in system_config.members.iter() { + member_states.insert(member_name.name.clone(), MemberState { + connected: false, + autoproxied: false, + }); + } + + let system_state = SystemState::Running(member_states); + ui_state.systems.insert(system_name.clone(), system_state); + join_handles.push((system_name.clone(), handle)); } + crossterm::execute!(io::stdout(), EnterAlternateScreen).unwrap(); + crossterm::execute!(io::stdout(), DisableLineWrap).unwrap(); + loop { - // Check manually every 10 seconds just in case - let _ = waiter.recv_timeout(Duration::from_secs(10)); + // Wait for an event from one of the threads + let ui_event = waiter.recv_timeout(Duration::from_millis(500)); + + if let Ok((system_name, ui_event)) = ui_event { + let system_state = ui_state.systems.get_mut(&system_name).unwrap(); + match system_state { + SystemState::Running(member_states) => match ui_event { + // We will check for the join in a second + SystemUiEvent::SystemClose => (), + + SystemUiEvent::MemberAutoproxy(member_name) => { + member_states.iter_mut().for_each(|(_, member_state)| { + member_state.autoproxied = false; + }); + + if let Some(member_name) = member_name { + member_states.get_mut(&member_name).unwrap() + .autoproxied = true; + } + }, + + SystemUiEvent::GatewayDisconnect(member_name) => { + member_states.get_mut(&member_name).unwrap() + .connected = false; + }, + + SystemUiEvent::GatewayConnect(member_name) => { + member_states.get_mut(&member_name).unwrap() + .connected = true; + + }, + + SystemUiEvent::LogLine(log) => { + if log.len() == MAX_LOG { + let _ = ui_state.logs.pop_front(); + } + + ui_state.logs.push_back( + format!("{system_name:>8.8}: {log}") + ); + }, + + }, + _ => (), + } + } // Just to make sure the join handle is updated by the time we check - sleep(Duration::from_millis(100)); + sleep(Duration::from_millis(10)); if let Some(completed_index) = join_handles.iter().position(|(_, handle)| handle.is_finished()) { let (name, next_join) = join_handles.swap_remove(completed_index); match next_join.join() { Err(err) => { - println!("Thread for system {} panicked!", name); - println!("{:?}", err); + let _ = ui_state.systems.insert(name.clone(), SystemState::Shutdown); + ui_state.logs.push_back( + format!("Thread for system {} panicked!", name) + ); + + ui_state.logs.push_back( + format!("{:?}", err) + ); }, Ok(SystemThreadCommand::Restart) => { - println!("Thread for system {} requested restart", name); + let _ = ui_state.systems.insert(name.clone(), SystemState::Restarting); + ui_state.logs.push_back( + format!("Thread for system {} requested restart", name) + ); if let Some((_, config)) = config.systems.iter().find(|(system_name, _)| name == **system_name) { let handle = spawn_system(&name, config.clone(), waker.clone()); join_handles.push((name, handle)); @@ -44,15 +144,24 @@ fn main() { }, Ok(SystemThreadCommand::ShutdownSystem) => { - println!("Thread for system {} requested shutdown", name); + let _ = ui_state.systems.insert(name.clone(), SystemState::Shutdown); + ui_state.logs.push_back( + format!("Thread for system {} requested shutdown", name) + ); continue; }, + Ok(SystemThreadCommand::ReloadConfig) => { - println!("Thread for system {} requested config reload", name); + let _ = ui_state.systems.insert(name.clone(), SystemState::Reloading); + ui_state.logs.push_back( + format!("Thread for system {} requested config reload", name) + ); let config_file = if let Ok(config_file) = fs::read_to_string("./config.toml") { config_file } else { - println!("Could not open config file, continuing with initial config"); + ui_state.logs.push_back( + format!("Could not open config file, continuing with initial config") + ); initial_config.clone() }; @@ -62,17 +171,24 @@ fn main() { let handle = spawn_system(&name, system_config, waker.clone()); join_handles.push((name.clone(), handle)); } else { - println!("New config file but this system no longer exists, exiting."); + ui_state.logs.push_back( + format!("New config file but this system no longer exists, exiting.") + ); continue; } }, Ok(SystemThreadCommand::ShutdownAll) => break, } } + + update_ui(&ui_state, &config); } + + crossterm::execute!(io::stdout(), EnableLineWrap).unwrap(); + crossterm::execute!(io::stdout(), LeaveAlternateScreen).unwrap(); } -fn spawn_system(system_name : &String, system_config: config::System, waker: mpsc::Sender<()>) -> JoinHandle { +fn spawn_system(system_name : &String, system_config: config::System, waker: mpsc::Sender<(String, SystemUiEvent)>) -> JoinHandle { let name = system_name.clone(); let config = system_config.clone(); @@ -80,16 +196,63 @@ fn spawn_system(system_name : &String, system_config: config::System, waker: mps .name(format!("seance_{}", &name)) .spawn(move || -> _ { let thread_local_runtime = runtime::Builder::new_current_thread().enable_all().build().unwrap(); + let dup_waker = waker.clone(); - // TODO: allow system manager runtime to return a command thread_local_runtime.block_on(async { - let mut system = Manager::new(name, config); + let mut system = Manager::new(name.clone(), config, waker); system.start_clients().await; }); - let _ = waker.send(()); + let _ = dup_waker.send((name, SystemUiEvent::SystemClose)); SystemThreadCommand::Restart }).unwrap() } +fn update_ui(ui_state: &UiState, config: &config::Config) { + crossterm::execute!(io::stdout(), Clear(ClearType::FromCursorUp)).unwrap(); + crossterm::execute!(io::stdout(), MoveTo(0, 0)).unwrap(); + + let (width, height) = crossterm::terminal::size().unwrap(); + let status_lines = (ui_state.systems.len() * 2) + ui_state.systems.values().map(|system| match system { + SystemState::Running(members) => members.len(), + SystemState::Reloading => 1, + SystemState::Restarting => 1, + SystemState::Shutdown => 1, + } ).sum::() + 1; + + let log_space = height as usize - status_lines - 1; + let log_height = ui_state.logs.len(); + + for (name, state) in ui_state.systems.iter() { + println!("{name}"); + match state { + SystemState::Shutdown => println!(" - [System stopped]"), + SystemState::Reloading => println!(" - [System reloading]"), + SystemState::Restarting => println!(" - [System restarting]"), + SystemState::Running(members) => for (name, state) in members { + if !state.connected { + println!(" - {name} (connecting)") + } else if state.autoproxied { + println!(" - {name} (autoproxy)") + } else { + println!(" - {name}") + } + }, + } + + println!(""); + } + + println!("{:-), LatchClear(MemberId), // TODO: Figure out how to represent emotes @@ -28,6 +28,7 @@ pub enum Command { Reproxy(MemberId, MessageId), Delete(MessageId), Nick(MemberId, String), + Log(String), ReloadSystemConfig, ExitSéance, UnknownCommand, @@ -51,13 +52,14 @@ impl MessageParser { } if message.content.starts_with(r"\") { - return ParsedMessage::UnproxiedMessage + return ParsedMessage::UnproxiedMessage(None) } if message.content.starts_with(r"!") { if let Some(parse) = MessageParser::check_command(message, secondary_message, system_config, latch_state) { return ParsedMessage::Command(parse); - + } else { + return ParsedMessage::UnproxiedMessage(Some(format!("Unknown command string: {}", message.content))); } } @@ -76,7 +78,7 @@ impl MessageParser { } // If nothing else - ParsedMessage::UnproxiedMessage + ParsedMessage::UnproxiedMessage(None) } fn check_command(message: &FullMessage, secondary_message: Option<&FullMessage>, system_config: &System, latch_state: Option<(MemberId, Timestamp)>) -> Option { @@ -86,6 +88,10 @@ impl MessageParser { match first_word { None => return None, Some(command_name) => match command_name { + "log" => { + let remainder = words.remainder().unwrap().to_string(); + return Some(Command::Log(remainder)); + }, "edit" => { let editing_member = Self::get_member_id_from_user_id(secondary_message.as_ref().unwrap().author.id, system_config).unwrap(); return Some(Command::Edit(editing_member, secondary_message.unwrap().id, words.remainder().unwrap().to_string())); diff --git a/src/system/mod.rs b/src/system/mod.rs index 44c6da8..e2bd1ee 100644 --- a/src/system/mod.rs +++ b/src/system/mod.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, num::NonZeroUsize, str::FromStr, time::Duration}; +use std::sync::mpsc::Sender as ThreadSender; use lru::LruCache; use tokio::{ sync::mpsc::{channel, Sender}, @@ -10,6 +11,7 @@ use twilight_model::{channel::message::{MessageReference, MessageType, ReactionT use twilight_model::util::Timestamp; use crate::config::{AutoproxyConfig, AutoproxyLatchScope, Member}; +use crate::SystemUiEvent; mod aggregator; mod bot; @@ -33,10 +35,11 @@ pub struct Manager { pub aggregator: MessageAggregator, pub send_cache: LruCache, pub reference_user_id: UserId, + pub ui_sender: ThreadSender<(String, SystemUiEvent)>, } impl Manager { - pub fn new(system_name: String, system_config: crate::config::System) -> Self { + pub fn new(system_name: String, system_config: crate::config::System, ui_sender : ThreadSender<(String, SystemUiEvent)>) -> Self { Self { reference_user_id: Id::from_str(&system_config.reference_user_id.as_str()) .expect(format!("Invalid user id for system {}", &system_name).as_str()), @@ -47,6 +50,7 @@ impl Manager { latch_state: None, system_sender: None, send_cache: LruCache::new(NonZeroUsize::new(15).unwrap()), + ui_sender, } } @@ -71,7 +75,9 @@ impl Manager { } pub async fn start_clients(&mut self) { - println!("Starting clients for system {}", self.name); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("Starting clients for system {}", self.name) + ))); let (system_sender, mut system_receiver) = channel::(100); self.system_sender = Some(system_sender.clone()); @@ -83,7 +89,9 @@ impl Manager { } if self.config.members.len() < 1 { - println!("WARNING: System {} has no configured members", &self.name); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("WARNING: System {} has no configured members", &self.name) + ))); } loop { @@ -94,19 +102,29 @@ impl Manager { let member = self.find_member_by_id(member_id).unwrap(); - println!("Gateway client {} ({}) connected", member.name, member_id); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::GatewayConnect(member.name.clone()))); + + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("Gateway client {} ({}) connected", member.name, member_id) + ))); } Some(SystemEvent::GatewayError(member_id, message)) => { let member = self.find_member_by_id(member_id).unwrap(); - println!("Gateway client {} ran into error {}", member.name, message); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("Gateway client {} ran into error {}", member.name, message) + ))); } Some(SystemEvent::GatewayClosed(member_id)) => { let member = self.find_member_by_id(member_id).unwrap(); - println!("Gateway client {} closed", member.name); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::GatewayDisconnect(member.name.clone()))); + + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("Gateway client {} closed", member.name) + ))); self.start_bot(member_id).await; } @@ -123,7 +141,11 @@ impl Manager { Some(SystemEvent::AutoproxyTimeout(time_scheduled)) => { if let Some((_member, current_last_message)) = self.latch_state.clone() { if current_last_message == time_scheduled { - println!("Autoproxy timeout has expired: {} (last sent), {} (timeout scheduled)", current_last_message.as_secs(), time_scheduled.as_secs()); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::MemberAutoproxy(None))); + + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("Autoproxy timeout has expired: {} (last sent), {} (timeout scheduled)", current_last_message.as_secs(), time_scheduled.as_secs()) + ))); self.latch_state = None; self.update_status_of_system().await; } @@ -195,7 +217,9 @@ impl Manager { if let Some(last) = last_in_channel { self.send_cache.put(message.channel_id, last); } else { - println!("WARNING: Could not look up most recent message in channel {}", message.channel_id); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("WARNING: Could not look up most recent message in channel {}", message.channel_id) + ))); }; // Return the message referenced from cache so there's no unnecessary clone @@ -206,12 +230,17 @@ impl Manager { let parsed_message = MessageParser::parse(&message, referenced_message, &self.config, self.latch_state); match parsed_message { - message_parser::ParsedMessage::UnproxiedMessage => (), + message_parser::ParsedMessage::UnproxiedMessage(log_string) => if let Some(log_string) = log_string { + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("Parse error: {log_string}") + ))); + }, message_parser::ParsedMessage::LatchClear(member_id) => { let _ = self.bots.get(&member_id).unwrap().delete_message(message.channel_id, message.id).await; self.latch_state = None; self.update_status_of_system().await; + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::MemberAutoproxy(None))); }, message_parser::ParsedMessage::SetProxyAndDelete(member_id) => { @@ -234,7 +263,9 @@ impl Manager { let author = MessageParser::get_member_id_from_user_id(referenced_message.unwrap().author.id, &self.config); if author.is_none() { - println!("Cannot edit another user's message"); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("Cannot edit another user's message") + ))); let _ = self.bots.get(&member_id).unwrap().react_message(message.channel_id, message.id, &RequestReactionType::Unicode { name: "🛑" }).await; return } @@ -254,14 +285,18 @@ impl Manager { message_parser::ParsedMessage::Command(Command::Reproxy(member_id, message_id)) => { if !referenced_message.map(|message| message.id == message_id).unwrap_or(false) { - println!("ERROR: Attempted reproxy on message other than referenced_message"); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("ERROR: Attempted reproxy on message other than referenced_message") + ))); let _ = self.bots.get(&member_id).unwrap().react_message(message.channel_id, message.id, &RequestReactionType::Unicode { name: "⁉️" }).await; return } let author = MessageParser::get_member_id_from_user_id(referenced_message.unwrap().author.id, &self.config); if author.is_none() { - println!("Cannot reproxy another user's message"); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("Cannot reproxy another user's message") + ))); let _ = self.bots.get(&member_id).unwrap().react_message(message.channel_id, message.id, &RequestReactionType::Unicode { name: "🛑" }).await; return } @@ -274,7 +309,9 @@ impl Manager { self.update_status_of_system().await; } } else { - println!("Not reproxying under same user"); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("Not reproxying under same user") + ))); } let bot = self.bots.get(&member_id).unwrap(); @@ -286,7 +323,9 @@ impl Manager { let author = MessageParser::get_member_id_from_user_id(referenced_message.unwrap().author.id, &self.config); if author.is_none() { - println!("Cannot delete another user's message"); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("Cannot delete another user's message") + ))); let _ = self.bots.get(&member_id).unwrap().react_message(message.channel_id, message.id, &RequestReactionType::Unicode { name: "🛑" }).await; return } @@ -296,6 +335,12 @@ impl Manager { let _ = bot.delete_message(message.channel_id, message.id).await; } + message_parser::ParsedMessage::Command(Command::Log(log_string)) => { + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("Log: {log_string}") + ))); + } + message_parser::ParsedMessage::Command(Command::UnknownCommand) => { let member_id = if let Some((member_id, _)) = self.latch_state { member_id @@ -317,7 +362,9 @@ impl Manager { let duplicate_result = bot.duplicate_message(message, content).await; if duplicate_result.is_err() { - println!("Could not copy message: {:?}", duplicate_result); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("Could not copy message: {:?}", duplicate_result) + ))); return Err(()) } @@ -325,7 +372,9 @@ impl Manager { let delete_result = bot.delete_message(message.channel_id, message.id).await; if delete_result.is_err() { - println!("Could not delete message: {:?}", delete_result); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( + format!("Could not delete message: {:?}", delete_result) + ))); // Delete the duplicated message if that failed let _ = bot.delete_message(message.channel_id, duplicate_result.unwrap().id).await; @@ -350,6 +399,9 @@ impl Manager { }) => { self.latch_state = Some((member, timestamp)); + let member = self.find_member_by_id(member).unwrap(); + let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::MemberAutoproxy(Some(member.name.clone())))); + if let Some(channel) = self.system_sender.clone() { let last_message = timestamp.clone(); let timeout_seconds = timeout_seconds.clone(); -- cgit 1.4.1