From a6a120ae8b8ed08b0801d76e80a5f7a0b8cde44b Mon Sep 17 00:00:00 2001 From: Ashelyn Dawn Date: Tue, 1 Oct 2024 15:37:09 -0600 Subject: Refactor gateway and message aggregation --- src/system.rs | 374 ---------------------------------------------------------- 1 file changed, 374 deletions(-) delete mode 100644 src/system.rs (limited to 'src/system.rs') diff --git a/src/system.rs b/src/system.rs deleted file mode 100644 index f62eeb6..0000000 --- a/src/system.rs +++ /dev/null @@ -1,374 +0,0 @@ -use std::{collections::HashMap, convert::Infallible, num::NonZeroUsize, str::FromStr, time::Duration}; - -use tokio::{sync::mpsc::{channel, Receiver, Sender}, time::{Sleep,sleep}}; -use futures::future::join_all; -use twilight_gateway::MessageSender; -use twilight_http::Client; -use twilight_model::{channel::{message::{AllowedMentions, MentionType, MessageType}, Message}, gateway::{payload::outgoing::{update_presence::UpdatePresencePayload, UpdatePresence}, presence::Status, OpCode}, id::{Id, marker::{ChannelMarker, MessageMarker, UserMarker}}}; -use twilight_model::util::Timestamp; -use twilight_model::http::attachment::Attachment; - -use crate::{config::{AutoproxyConfig, AutoproxyLatchScope, Member, MemberName}, listener::{Listener, ClientEvent}}; - -pub struct System { - pub name: String, - pub config: crate::config::System, - pub message_dedup_cache: lru::LruCache, Timestamp>, - pub clients: HashMap, - pub latch_state: Option<(Member, Timestamp)>, - pub channel: (Sender, Receiver), - pub gateway_channels: HashMap, - pub last_presence: HashMap, -} - -impl System { - pub fn new(system_name: String, system_config: crate::config::System) -> Self { - System { - name: system_name, - config: system_config, - message_dedup_cache: lru::LruCache::new(NonZeroUsize::new(100).unwrap()), - clients: HashMap::new(), - latch_state: None, - channel: channel::(100), - gateway_channels: HashMap::new(), - last_presence: HashMap::new(), - } - } - - pub async fn start_clients(&mut self) { - println!("Starting clients for system {}", self.name); - - let reference_user_id : Id = Id::from_str(self.config.reference_user_id.as_str()) - .expect(format!("Invalid user ID: {}", self.config.reference_user_id).as_str()); - - for member in self.config.members.iter() { - let client = twilight_http::Client::new(member.discord_token.clone()); - self.clients.insert(member.name.clone(), client); - - let tx = self.channel.0.clone(); - let member = member.clone(); - - // Start gateway listener - tokio::spawn(async move { - let mut listener = Listener::new(member, reference_user_id); - listener.start_listening(tx).await; - }); - } - - loop { - match self.channel.1.recv().await { - Some(event) => match event { - ClientEvent::Ready { client_name, send_channel } => { - self.gateway_channels.insert(client_name, send_channel); - - if self.gateway_channels.len() == self.clients.len() { - let tx = self.channel.0.clone(); - tokio::spawn(async move { - sleep(Duration::from_secs(10)).await; - let _ = tx.send(ClientEvent::Startup).await; - }); - } - }, - ClientEvent::Message { event_time, message } => { - if self.is_new_message(message.id, event_time) { - self.handle_message(message, event_time).await; - } - }, - ClientEvent::Error(_err) => { - println!("Client ran into an error for system {}", self.name); - return - }, - ClientEvent::AutoproxyTimeout { last_message } => { - if let Some((_member, current_last_message)) = self.latch_state.clone() { - if current_last_message == last_message { - println!("Autoproxy timeout has expired: {} (last sent), {} (timeout scheduled)", current_last_message.as_secs(), last_message.as_secs()); - self.latch_state = None; - self.update_status_of_system(); - } - } - }, - ClientEvent::Startup => { - println!("Attempting to set startup status for system {}", self.name.clone()); - self.update_status_of_system(); - }, - }, - None => { - return - }, - } - } - } - - fn is_new_message(&mut self, message_id: Id, timestamp: Timestamp) -> bool { - let last_seen_timestamp = self.message_dedup_cache.get(&message_id); - let current_timestamp = timestamp; - - if last_seen_timestamp.is_none() || last_seen_timestamp.unwrap().as_micros() < current_timestamp.as_micros() { - self.message_dedup_cache.put(message_id, timestamp); - true - } else { - false - } - } - - async fn handle_message(&mut self, message: Message, timestamp: Timestamp) { - // TODO: Commands - if message.content.eq("!panic") { - panic!("Exiting due to user command"); - } - - // Escape sequence - if message.content.starts_with(r"\") { - if message.content == r"\\" { - let client = if let Some((current_member, _)) = self.latch_state.clone() { - self.clients.get(¤t_member.name).expect(format!("No client for member {}", current_member.name).as_str()) - } else { - self.clients.iter().next().expect("No clients!").1 - }; - - client.delete_message(message.channel_id, message.id).await.expect("Could not delete message"); - self.latch_state = None - } else if message.content.starts_with(r"\\") { - self.latch_state = None; - } - - return - } - - // TODO: Non-latching prefixes maybe? - - // Check for prefix - let match_prefix = self.config.members.iter().find_map(|member| Some((member, member.matches_proxy_prefix(&message)?))); - if let Some((member, matched_content)) = match_prefix { - self.proxy_message(&message, member, matched_content).await; - self.update_autoproxy_state_after_message(member.clone(), timestamp); - self.update_status_of_system(); - return - } - - // Check for autoproxy - if let Some(autoproxy_config) = &self.config.autoproxy { - match autoproxy_config { - AutoproxyConfig::Member {name} => { - let member = self.config.members.iter().find(|member| member.name == *name).expect("Invalid autoproxy member name"); - self.proxy_message(&message, member, message.content.as_str()).await; - }, - // TODO: Do something with the latch scope - // TODO: Do something with presence setting - AutoproxyConfig::Latch { scope, timeout_seconds, presence_indicator } => { - if let Some((member, last_timestamp)) = self.latch_state.clone() { - let time_since_last = timestamp.as_secs() - last_timestamp.as_secs(); - if time_since_last <= (*timeout_seconds).into() { - self.proxy_message(&message, &member, message.content.as_str()).await; - self.latch_state = Some((member.clone(), timestamp)); - self.update_autoproxy_state_after_message(member.clone(), timestamp); - self.update_status_of_system(); - } - } - }, - } - } - } - - async fn proxy_message(&self, message: &Message, member: &Member, content: &str) { - let client = self.clients.get(&member.name).expect("No client for member"); - - if let Err(err) = self.duplicate_message(message, client, content).await { - match err { - MessageDuplicateError::MessageCreate(err) => { - if err.to_string().contains("Cannot send an empty message") { - client.delete_message(message.channel_id, message.id).await.expect("Could not delete message"); - } - }, - _ => println!("Error: {:?}", err), - } - } else { - client.delete_message(message.channel_id, message.id).await.expect("Could not delete message"); - } - } - - async fn duplicate_message(&self, message: &Message, client: &Client, content: &str) -> Result { - let mut create_message = client.create_message(message.channel_id) - .content(content)?; - - let mut allowed_mentions = AllowedMentions { - parse: Vec::new(), - replied_user: false, - roles: message.mention_roles.clone(), - users: message.mentions.iter().map(|user| user.id).collect(), - }; - - if message.mention_everyone { - allowed_mentions.parse.push(MentionType::Everyone); - } - - if message.kind == MessageType::Reply { - if let Some(ref_message) = message.referenced_message.as_ref() { - create_message = create_message.reply(ref_message.id); - - let pings_referenced_author = message.mentions.iter().any(|user| user.id == ref_message.author.id); - - if pings_referenced_author { - allowed_mentions.replied_user = true; - } else { - allowed_mentions.replied_user = false; - } - - } else { - panic!("Cannot proxy message: Was reply but no referenced message"); - } - } - - let attachments = join_all(message.attachments.iter().map(|attachment| async { - let filename = attachment.filename.clone(); - let description_opt = attachment.description.clone(); - let bytes = reqwest::get(attachment.proxy_url.clone()).await?.bytes().await?; - let mut new_attachment = Attachment::from_bytes(filename, bytes.try_into().unwrap(), attachment.id.into()); - - if let Some(description) = description_opt { - new_attachment.description(description); - } - - Ok(new_attachment) - })).await.iter().filter_map(|result: &Result| match result { - Ok(attachment) => Some(attachment.clone()), - Err(_) => None, - }).collect::>(); - - if attachments.len() > 0 { - create_message = create_message.attachments(attachments.as_slice())?; - } - - if let Some(flags) = message.flags { - create_message = create_message.flags(flags); - } - - create_message = create_message.allowed_mentions(Some(&allowed_mentions)); - let new_message = create_message.await?.model().await?; - - Ok(new_message) - } - - fn update_autoproxy_state_after_message(&mut self, member: Member, timestamp: Timestamp) { - match &self.config.autoproxy { - None => (), - Some(AutoproxyConfig::Member { name: _ }) => (), - Some(AutoproxyConfig::Latch { scope, timeout_seconds, presence_indicator: _ }) => { - self.latch_state = Some((member.clone(), timestamp)); - - let tx = self.channel.0.clone(); - let last_message = timestamp.clone(); - let timeout_seconds = timeout_seconds.clone(); - - tokio::spawn(async move { - sleep(Duration::from_secs(timeout_seconds.into())).await; - tx.send(ClientEvent::AutoproxyTimeout { last_message }) - .await.expect("Channel has closed"); - }); - } - } - } - - fn update_status_of_system(&mut self) { - let member_states : Vec<(Member, Status)> = self.config.members.iter().map(|member| { - match &self.config.autoproxy { - None => (member.clone(), Status::Invisible), - Some(AutoproxyConfig::Member { name }) => (member.clone(), if member.name == *name { - Status::Online - } else { - Status::Invisible - }), - Some(AutoproxyConfig::Latch { scope, timeout_seconds: _, presence_indicator }) => { - if let AutoproxyLatchScope::Server = scope { - (member.clone(), Status::Invisible) - } - else if !presence_indicator { - (member.clone(), Status::Invisible) - } else { - match &self.latch_state { - Some((latch_member, _last_timestamp)) => (member.clone(), if member.name == latch_member.name { - Status::Online - } else { - Status::Invisible - }), - None => (member.clone(), Status::Invisible), - } - } - } - } - }).collect(); - - for (member, status) in member_states { - self.update_status_of_member(&member, status); - } - } - - fn update_status_of_member(&mut self, member: &Member, status: Status) { - let last_status = *self.last_presence.get(&member.name).unwrap_or(&Status::Offline); - - if status == last_status { - return - } - - let gateway_channel = self.gateway_channels.get(&member.name).expect("No gateway shard for member"); - gateway_channel.command(&UpdatePresence { - d: UpdatePresencePayload { - activities: Vec::new(), - afk: false, - since: None, - status, - }, - op: OpCode::PresenceUpdate, - }).expect("Could not send command to gateway"); - - self.last_presence.insert(member.name.clone(), status); - } -} - - - -impl crate::config::Member { - pub fn matches_proxy_prefix<'a>(&self, message: &'a Message) -> Option<&'a str> { - match self.message_pattern.captures(message.content.as_str()) { - None => None, - Some(captures) => { - match captures.name("content") { - None => None, - Some(matched_content) => Some(matched_content.as_str()), - } - } - } - } -} - -#[derive(Debug)] -enum MessageDuplicateError { - MessageValidation(twilight_validate::message::MessageValidationError), - AttachmentRequest(reqwest::Error), - MessageCreate(twilight_http::error::Error), - ResponseDeserialization(twilight_http::response::DeserializeBodyError) -} - -impl From for MessageDuplicateError { - fn from(value: twilight_validate::message::MessageValidationError) -> Self { - MessageDuplicateError::MessageValidation(value) - } -} - -impl From for MessageDuplicateError { - fn from(value: reqwest::Error) -> Self { - MessageDuplicateError::AttachmentRequest(value) - } -} - -impl From for MessageDuplicateError { - fn from(value: twilight_http::error::Error) -> Self { - MessageDuplicateError::MessageCreate(value) - } -} - -impl From for MessageDuplicateError { - fn from(value: twilight_http::response::DeserializeBodyError) -> Self { - MessageDuplicateError::ResponseDeserialization(value) - } -} -- cgit 1.4.1