From 8b716d49ed019213d91a45f094684f26fac289bd Mon Sep 17 00:00:00 2001 From: Ashelyn Rose Date: Wed, 2 Oct 2024 02:22:34 -0600 Subject: refactor gateway and client together into bot struct --- src/system/bot.rs | 317 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/system/gateway.rs | 167 -------------------------- src/system/mod.rs | 232 +++++++----------------------------- src/system/types.rs | 3 +- 4 files changed, 361 insertions(+), 358 deletions(-) create mode 100644 src/system/bot.rs delete mode 100644 src/system/gateway.rs (limited to 'src/system') diff --git a/src/system/bot.rs b/src/system/bot.rs new file mode 100644 index 0000000..8014b60 --- /dev/null +++ b/src/system/bot.rs @@ -0,0 +1,317 @@ +use std::sync::Arc; +use tokio::sync::mpsc::Sender; +use tokio::sync::Mutex; +use futures::future::join_all; +use twilight_gateway::{Intents, Shard, ShardId}; +use twilight_http::{request::channel::reaction::RequestReactionType, Client}; +use twilight_http::error::Error as TwError; +use twilight_model::gateway::{ + payload::outgoing::{update_presence::UpdatePresencePayload, UpdatePresence}, + OpCode, +}; +use twilight_model::{ + channel, + channel::{ + message::{AllowedMentions, MentionType, MessageType}, + Message, + }, + id::{marker::UserMarker, Id}, +}; +use twilight_model::http::attachment::Attachment; + +use super::{MemberId, MessageEvent, Status, SystemEvent, UserId, MessageId, ChannelId}; + +#[derive(Clone)] +pub struct Bot { + member_id: MemberId, + reference_user_id: UserId, + last_presence: Arc>, + message_handler: Option>, + system_handler: Option>, + shard: Arc>, + client: Arc>, +} + +impl Bot { + pub fn new( + member_id: MemberId, + config: &crate::config::Member, + reference_user_id: UserId, + ) -> Self { + let intents = Intents::GUILD_MEMBERS + | Intents::GUILD_PRESENCES + | Intents::GUILD_MESSAGES + | Intents::MESSAGE_CONTENT; + + Self { + member_id, + reference_user_id, + last_presence: Arc::new(Mutex::new(Status::Online)), + message_handler: None, + system_handler: None, + shard: Arc::new(Mutex::new(Shard::new( + ShardId::ONE, + config.discord_token.clone(), + intents, + ))), + client: Arc::new(Mutex::new(Client::new(config.discord_token.clone()))), + } + } + + pub fn set_message_handler(&mut self, handler: Sender) { + self.message_handler = Some(handler); + } + + pub fn set_system_handler(&mut self, handler: Sender) { + self.system_handler = Some(handler); + } + + pub async fn set_status(&self, status: Status) { + let mut last_status = self.last_presence.lock().await; + + if status == *last_status { + return + } + + let mut shard = self.shard.lock().await; + + shard + .command(&UpdatePresence { + d: UpdatePresencePayload { + activities: Vec::new(), + afk: false, + since: None, + status, + }, + op: OpCode::PresenceUpdate, + }) + .await + .expect("Could not send command to gateway"); + + *last_status = status; + } + + pub async fn delete_message(&self, channel_id: ChannelId, message_id: MessageId) -> Result<(), TwError> { + let client = self.client.lock().await; + let delete_result = client.delete_message(channel_id, message_id).await; + + match delete_result { + Err(err) => { + match &err.kind() { + twilight_http::error::ErrorType::Response { body: _, error, status: _ } => match error { + twilight_http::api_error::ApiError::General(err) => { + // Code for "Missing Permissions": https://discord.com/developers/docs/topics/opcodes-and-status-codes#json-json-error-codes + if err.code == 50013 { + println!("ERROR: Client {} doesn't have permissions to delete message", self.member_id); + let _ = client.create_reaction( + channel_id, + message_id, + &RequestReactionType::Unicode { name: "🔐" } + ).await; + } + }, + _ => (), + }, + _ => (), + }; + + Err(err) + }, + _ => Ok(()), + } + } + + pub async fn duplicate_message(&self, message: &Message, content: &str) -> Result { + let client = self.client.lock().await; + + let mut create_message = client.create_message(message.channel_id).content(content)?; + + let mut allowed_mentions = AllowedMentions { + parse: Vec::new(), + replied_user: false, + roles: message.mention_roles.clone(), + users: message.mentions.iter().map(|user| user.id).collect(), + }; + + if message.mention_everyone { + allowed_mentions.parse.push(MentionType::Everyone); + } + + if message.kind == MessageType::Reply { + if let Some(ref_message) = message.referenced_message.as_ref() { + create_message = create_message.reply(ref_message.id); + + let pings_referenced_author = message + .mentions + .iter() + .any(|user| user.id == ref_message.author.id); + + if pings_referenced_author { + allowed_mentions.replied_user = true; + } else { + allowed_mentions.replied_user = false; + } + } else { + panic!("Cannot proxy message: Was reply but no referenced message"); + } + } + + let attachments = join_all(message.attachments.iter().map(|attachment| async { + let filename = attachment.filename.clone(); + let description_opt = attachment.description.clone(); + let bytes = reqwest::get(attachment.proxy_url.clone()) + .await? + .bytes() + .await?; + let mut new_attachment = + Attachment::from_bytes(filename, bytes.try_into().unwrap(), attachment.id.into()); + + if let Some(description) = description_opt { + new_attachment.description(description); + } + + Ok(new_attachment) + })) + .await + .iter() + .filter_map( + |result: &Result| match result { + Ok(attachment) => Some(attachment.clone()), + Err(_) => None, + }, + ) + .collect::>(); + + if attachments.len() > 0 { + create_message = create_message.attachments(attachments.as_slice())?; + } + + if let Some(flags) = message.flags { + create_message = create_message.flags(flags); + } + + create_message = create_message.allowed_mentions(Some(&allowed_mentions)); + let new_message = create_message.await?.model().await?; + + Ok(new_message) + } + + pub fn start_listening(&self) { + self.clone().start_listen_task() + } + + fn start_listen_task(self) { + tokio::spawn(async move { + loop { + let next_event = { self.shard.lock().await.next_event().await }; + + match next_event { + Err(source) => { + if let Some(channel) = &self.system_handler { + channel + .send(SystemEvent::GatewayError(self.member_id, source.to_string())) + .await; + + if source.is_fatal() { + channel.send(SystemEvent::GatewayClosed(self.member_id)).await; + break; + } + } + } + Ok(event) => match event { + twilight_gateway::Event::Ready(_) => { + if let Some(channel) = &self.system_handler { + channel + .send(SystemEvent::GatewayConnected(self.member_id)) + .await; + } + } + + twilight_gateway::Event::MessageCreate(message_create) => { + let message = message_create.0; + + if message.author.id != self.reference_user_id { + continue; + } + + if let Some(channel) = &self.message_handler { + channel + .send((message.timestamp, message)) + .await; + } + } + + twilight_gateway::Event::MessageUpdate(message_update) => { + if message_update.author.is_none() + || message_update.author.as_ref().unwrap().id != self.reference_user_id + { + continue; + } + + if message_update.edited_timestamp.is_none() { + println!("Message update but no edit timestamp"); + continue; + } + + if message_update.content.is_none() { + println!("Message update but no content"); + continue; + } + + let message = self.client + .lock() + .await + .message(message_update.channel_id, message_update.id) + .await + .expect("Could not load message") + .model() + .await + .expect("Could not deserialize message"); + + if let Some(channel) = &self.message_handler { + channel + .send((message_update.edited_timestamp.unwrap(), message)) + .await; + } + } + + _ => (), + }, + } + } + }); + } +} + + +#[derive(Debug)] +pub enum MessageDuplicateError { + MessageValidation(twilight_validate::message::MessageValidationError), + AttachmentRequest(reqwest::Error), + MessageCreate(twilight_http::error::Error), + ResponseDeserialization(twilight_http::response::DeserializeBodyError), +} + +impl From for MessageDuplicateError { + fn from(value: twilight_validate::message::MessageValidationError) -> Self { + MessageDuplicateError::MessageValidation(value) + } +} + +impl From for MessageDuplicateError { + fn from(value: reqwest::Error) -> Self { + MessageDuplicateError::AttachmentRequest(value) + } +} + +impl From for MessageDuplicateError { + fn from(value: twilight_http::error::Error) -> Self { + MessageDuplicateError::MessageCreate(value) + } +} + +impl From for MessageDuplicateError { + fn from(value: twilight_http::response::DeserializeBodyError) -> Self { + MessageDuplicateError::ResponseDeserialization(value) + } +} diff --git a/src/system/gateway.rs b/src/system/gateway.rs deleted file mode 100644 index 17bfe3d..0000000 --- a/src/system/gateway.rs +++ /dev/null @@ -1,167 +0,0 @@ -use std::sync::Arc; -use tokio::sync::mpsc::Sender; -use tokio::sync::Mutex; -use twilight_gateway::{Intents, Shard, ShardId}; -use twilight_http::Client; -use twilight_model::gateway::{ - payload::outgoing::{update_presence::UpdatePresencePayload, UpdatePresence}, - OpCode, -}; - -use super::{MemberId, MessageEvent, Status, SystemEvent, UserId}; - -pub struct Gateway { - member_id: MemberId, - discord_token: String, - reference_user_id: UserId, - message_handler: Option>>>, - system_handler: Option>>>, - shard: Arc>, -} - -impl Gateway { - pub fn new( - member_id: MemberId, - config: &crate::config::Member, - reference_user_id: UserId, - ) -> Self { - let intents = Intents::GUILD_MEMBERS - | Intents::GUILD_PRESENCES - | Intents::GUILD_MESSAGES - | Intents::MESSAGE_CONTENT; - - Self { - member_id, - discord_token: config.discord_token.clone(), - reference_user_id, - message_handler: None, - system_handler: None, - shard: Arc::new(Mutex::new(Shard::new( - ShardId::ONE, - config.discord_token.clone(), - intents, - ))), - } - } - - pub fn set_message_handler(&mut self, handler: Sender) { - self.message_handler = Some(Arc::new(Mutex::new(handler))); - } - - pub fn set_system_handler(&mut self, handler: Sender) { - self.system_handler = Some(Arc::new(Mutex::new(handler))); - } - - pub async fn set_status(&self, status: Status) { - let mut shard = self.shard.lock().await; - - shard - .command(&UpdatePresence { - d: UpdatePresencePayload { - activities: Vec::new(), - afk: false, - since: None, - status, - }, - op: OpCode::PresenceUpdate, - }) - .await - .expect("Could not send command to gateway"); - } - - pub fn start_listening(&self) { - let message_channel = self.message_handler.clone(); - let system_channel = self.system_handler.clone(); - let shard = self.shard.clone(); - let member_id = self.member_id.clone(); - let reference_user_id = self.reference_user_id.clone(); - let client = Client::new(self.discord_token.clone()); - - tokio::spawn(async move { - loop { - let next_event = { shard.lock().await.next_event().await }; - - match next_event { - Err(source) => { - if let Some(channel) = &system_channel { - let channel = channel.lock().await; - - channel - .send(SystemEvent::GatewayError(member_id, source.to_string())) - .await; - - if source.is_fatal() { - channel.send(SystemEvent::GatewayClosed(member_id)).await; - break; - } - } - todo!("Handle this") - } - Ok(event) => match event { - twilight_gateway::Event::Ready(_) => { - if let Some(channel) = &system_channel { - channel - .lock() - .await - .send(SystemEvent::GatewayConnected(member_id)) - .await; - } - } - - twilight_gateway::Event::MessageCreate(message_create) => { - let message = message_create.0; - - if message.author.id != reference_user_id { - continue; - } - - if let Some(channel) = &message_channel { - channel - .lock() - .await - .send((message.timestamp, message)) - .await; - } - } - - twilight_gateway::Event::MessageUpdate(message_update) => { - if message_update.author.is_none() - || message_update.author.as_ref().unwrap().id != reference_user_id - { - continue; - } - - if message_update.edited_timestamp.is_none() { - println!("Message update but no edit timestamp"); - continue; - } - - if message_update.content.is_none() { - println!("Message update but no content"); - continue; - } - - let message = client - .message(message_update.channel_id, message_update.id) - .await - .expect("Could not load message") - .model() - .await - .expect("Could not deserialize message"); - - if let Some(channel) = &message_channel { - channel - .lock() - .await - .send((message_update.edited_timestamp.unwrap(), message)) - .await; - } - } - - _ => (), - }, - } - } - }); - } -} diff --git a/src/system/mod.rs b/src/system/mod.rs index ce23466..4179e3b 100644 --- a/src/system/mod.rs +++ b/src/system/mod.rs @@ -1,37 +1,33 @@ use std::{collections::HashMap, str::FromStr, time::Duration}; -use futures::future::join_all; use tokio::{ sync::mpsc::{channel, Sender}, time::sleep, }; -use twilight_http::Client; -use twilight_model::http::attachment::Attachment; -use twilight_model::util::Timestamp; use twilight_model::{ channel::{ - message::{AllowedMentions, MentionType, MessageType}, Message, }, id::{marker::UserMarker, Id}, }; +use twilight_model::util::Timestamp; use crate::config::{AutoproxyConfig, AutoproxyLatchScope, Member}; mod aggregator; -mod gateway; +mod bot; mod types; use aggregator::MessageAggregator; -use gateway::Gateway; +use bot::Bot; pub use types::*; +use self::bot::MessageDuplicateError; + pub struct Manager { pub name: String, pub config: crate::config::System, - pub clients: HashMap, - pub gateways: HashMap, + pub bots: HashMap, pub latch_state: Option<(MemberId, Timestamp)>, - pub last_presence: HashMap, pub system_sender: Option>, } @@ -40,10 +36,8 @@ impl Manager { Self { name: system_name, config: system_config, - clients: HashMap::new(), - gateways: HashMap::new(), + bots: HashMap::new(), latch_state: None, - last_presence: HashMap::new(), system_sender: None, } } @@ -81,19 +75,15 @@ impl Manager { aggregator.set_handler(system_sender.clone()); for (member_id, member) in self.config.members.iter().enumerate() { - // Create outgoing client - let client = twilight_http::Client::new(member.discord_token.clone()); - self.clients.insert(member_id, client); - // Create gateway listener - let mut listener = Gateway::new(member_id, &member, reference_user_id); + let mut bot = Bot::new(member_id, &member, reference_user_id); - listener.set_message_handler(aggregator.get_sender()); - listener.set_system_handler(system_sender.clone()); + bot.set_message_handler(aggregator.get_sender()); + bot.set_system_handler(system_sender.clone()); // Start gateway listener - listener.start_listening(); - self.gateways.insert(member_id, listener); + bot.start_listening(); + self.bots.insert(member_id, bot); } aggregator.start(); @@ -174,18 +164,16 @@ impl Manager { // Escape sequence if message.content.starts_with(r"\") { if message.content == r"\\" { - let client = if let Some((current_member, _)) = self.latch_state.clone() { - self.clients + let bot = if let Some((current_member, _)) = self.latch_state.clone() { + self.bots .get(¤t_member) .expect(format!("No client for member {}", current_member).as_str()) } else { - self.clients.iter().next().expect("No clients!").1 + self.bots.iter().next().expect("No clients!").1 }; - client - .delete_message(message.channel_id, message.id) - .await - .expect("Could not delete message"); + // We don't really care about the outcome here, we don't proxy afterwards + let _ = bot.delete_message(message.channel_id, message.id).await; self.latch_state = None } else if message.content.starts_with(r"\\") { self.latch_state = None; @@ -197,7 +185,6 @@ impl Manager { // TODO: Non-latching prefixes maybe? // Check for prefix - println!("Checking prefix"); let match_prefix = self.config .members @@ -207,16 +194,15 @@ impl Manager { Some((member_id, member.matches_proxy_prefix(&message)?)) }); if let Some((member_id, matched_content)) = match_prefix { - self.proxy_message(&message, member_id, matched_content) - .await; - println!("Updating proxy state to member id {}", member_id); - self.update_autoproxy_state_after_message(member_id, timestamp); - self.update_status_of_system().await; - return; + if let Ok(_) = self.proxy_message(&message, member_id, matched_content).await { + self.latch_state = Some((member_id, timestamp)); + self.update_autoproxy_state_after_message(member_id, timestamp); + self.update_status_of_system().await; + } + return } // Check for autoproxy - println!("Checking autoproxy"); if let Some(autoproxy_config) = &self.config.autoproxy { match autoproxy_config { AutoproxyConfig::Member { name } => { @@ -233,126 +219,40 @@ impl Manager { timeout_seconds, presence_indicator, } => { - println!("Currently in latch mode"); if let Some((member, last_timestamp)) = self.latch_state.clone() { - println!("We have a latch state"); let time_since_last = timestamp.as_secs() - last_timestamp.as_secs(); - println!("Time since last (seconds) {}", time_since_last); if time_since_last <= (*timeout_seconds).into() { - println!("Proxying"); - self.proxy_message(&message, member, message.content.as_str()) - .await; - self.latch_state = Some((member, timestamp)); - self.update_autoproxy_state_after_message(member, timestamp); - self.update_status_of_system().await; + if let Ok(_) = self.proxy_message(&message, member, message.content.as_str()).await { + self.latch_state = Some((member, timestamp)); + self.update_autoproxy_state_after_message(member, timestamp); + self.update_status_of_system().await; + } } } } } - } else { - println!("No autoproxy config?"); } } - async fn proxy_message(&self, message: &Message, member: MemberId, content: &str) { - let client = self.clients.get(&member).expect("No client for member"); + async fn proxy_message(&self, message: &Message, member: MemberId, content: &str) -> Result<(), ()> { + let bot = self.bots.get(&member).expect("No client for member"); - if let Err(err) = self.duplicate_message(message, client, content).await { - match err { - MessageDuplicateError::MessageCreate(err) => { - if err.to_string().contains("Cannot send an empty message") { - client - .delete_message(message.channel_id, message.id) - .await - .expect("Could not delete message"); - } - } - _ => println!("Error: {:?}", err), - } - } else { - client - .delete_message(message.channel_id, message.id) - .await - .expect("Could not delete message"); - } - } + let duplicate_result = bot.duplicate_message(message, content).await; - async fn duplicate_message( - &self, - message: &Message, - client: &Client, - content: &str, - ) -> Result { - let mut create_message = client.create_message(message.channel_id).content(content)?; - - let mut allowed_mentions = AllowedMentions { - parse: Vec::new(), - replied_user: false, - roles: message.mention_roles.clone(), - users: message.mentions.iter().map(|user| user.id).collect(), - }; - - if message.mention_everyone { - allowed_mentions.parse.push(MentionType::Everyone); + if duplicate_result.is_err() { + return Err(()) } - if message.kind == MessageType::Reply { - if let Some(ref_message) = message.referenced_message.as_ref() { - create_message = create_message.reply(ref_message.id); - - let pings_referenced_author = message - .mentions - .iter() - .any(|user| user.id == ref_message.author.id); + // Try to delete message first as that fails more often + let delete_result = bot.delete_message(message.channel_id, message.id).await; - if pings_referenced_author { - allowed_mentions.replied_user = true; - } else { - allowed_mentions.replied_user = false; - } - } else { - panic!("Cannot proxy message: Was reply but no referenced message"); - } + if delete_result.is_err() { + // Delete the duplicated message if that failed + let _ = bot.delete_message(message.channel_id, duplicate_result.unwrap().id).await; + return Err(()) } - let attachments = join_all(message.attachments.iter().map(|attachment| async { - let filename = attachment.filename.clone(); - let description_opt = attachment.description.clone(); - let bytes = reqwest::get(attachment.proxy_url.clone()) - .await? - .bytes() - .await?; - let mut new_attachment = - Attachment::from_bytes(filename, bytes.try_into().unwrap(), attachment.id.into()); - - if let Some(description) = description_opt { - new_attachment.description(description); - } - - Ok(new_attachment) - })) - .await - .iter() - .filter_map( - |result: &Result| match result { - Ok(attachment) => Some(attachment.clone()), - Err(_) => None, - }, - ) - .collect::>(); - - if attachments.len() > 0 { - create_message = create_message.attachments(attachments.as_slice())?; - } - - if let Some(flags) = message.flags { - create_message = create_message.flags(flags); - } - - create_message = create_message.allowed_mentions(Some(&allowed_mentions)); - let new_message = create_message.await?.model().await?; - - Ok(new_message) + Ok(()) } fn update_autoproxy_state_after_message(&mut self, member: MemberId, timestamp: Timestamp) { @@ -433,25 +333,8 @@ impl Manager { } async fn update_status_of_member(&mut self, member: MemberId, status: Status) { - let last_status = *self.last_presence.get(&member).unwrap_or(&Status::Offline); - - if status == last_status { - return; - } - - if let Some(gateway) = self.gateways.get(&member) { - gateway.set_status(status).await; - - self.last_presence.insert(member, status); - } else { - let full_member = self - .find_member_by_id(member) - .expect("Cannot look up member"); - println!( - "Could not look up gateway for member ID {} ({})", - member, full_member.name - ); - } + let bot = self.bots.get(&member).expect("No client for member"); + bot.set_status(status).await; } } @@ -467,34 +350,3 @@ impl crate::config::Member { } } -#[derive(Debug)] -enum MessageDuplicateError { - MessageValidation(twilight_validate::message::MessageValidationError), - AttachmentRequest(reqwest::Error), - MessageCreate(twilight_http::error::Error), - ResponseDeserialization(twilight_http::response::DeserializeBodyError), -} - -impl From for MessageDuplicateError { - fn from(value: twilight_validate::message::MessageValidationError) -> Self { - MessageDuplicateError::MessageValidation(value) - } -} - -impl From for MessageDuplicateError { - fn from(value: reqwest::Error) -> Self { - MessageDuplicateError::AttachmentRequest(value) - } -} - -impl From for MessageDuplicateError { - fn from(value: twilight_http::error::Error) -> Self { - MessageDuplicateError::MessageCreate(value) - } -} - -impl From for MessageDuplicateError { - fn from(value: twilight_http::response::DeserializeBodyError) -> Self { - MessageDuplicateError::ResponseDeserialization(value) - } -} diff --git a/src/system/types.rs b/src/system/types.rs index 862ddd1..e8d14a6 100644 --- a/src/system/types.rs +++ b/src/system/types.rs @@ -1,10 +1,11 @@ use twilight_model::channel::Message; -use twilight_model::id::marker::{MessageMarker, UserMarker}; +use twilight_model::id::marker::{ChannelMarker, MessageMarker, UserMarker}; use twilight_model::id::Id; use twilight_model::util::Timestamp; pub type MemberId = usize; pub type MessageId = Id; +pub type ChannelId = Id; pub type UserId = Id; pub type Status = twilight_model::gateway::presence::Status; -- cgit 1.4.1