diff options
author | Ashelyn Rose <git@ashen.earth> | 2025-02-28 21:52:16 -0700 |
---|---|---|
committer | Ashelyn Rose <git@ashen.earth> | 2025-02-28 21:52:16 -0700 |
commit | e9253bd959bf5bf6e8bcc6de4db247895b015a16 (patch) | |
tree | 1817aed80c8255d1292007a81e7f0a16138d25cc | |
parent | 5cb49a76c2cedb500b82f405af3cf1dcc0507f98 (diff) |
Partial refactor
Handles message prefixes and autoproxy
-rw-r--r-- | Cargo.lock | 12 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/config.rs | 4 | ||||
-rw-r--r-- | src/main.rs | 7 | ||||
-rw-r--r-- | src/system/aggregator.rs | 177 | ||||
-rw-r--r-- | src/system/bot/client.rs | 237 | ||||
-rw-r--r-- | src/system/bot/gateway.rs | 123 | ||||
-rw-r--r-- | src/system/bot/mod.rs | 105 | ||||
-rw-r--r-- | src/system/message_parser.rs | 238 | ||||
-rw-r--r-- | src/system/mod.rs | 541 | ||||
-rw-r--r-- | src/system/plugin.rs | 25 | ||||
-rw-r--r-- | src/system/plugin/autoproxy.rs | 143 | ||||
-rw-r--r-- | src/system/plugin/prefixes.rs | 40 | ||||
-rw-r--r-- | src/system/types.rs | 59 | ||||
-rw-r--r-- | src/system/util.rs | 113 |
15 files changed, 511 insertions, 1314 deletions
diff --git a/Cargo.lock b/Cargo.lock index 67cc02c..7a51702 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,6 +60,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" [[package]] +name = "async-trait" +version = "0.1.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "644dd749086bf3771a2fbc5f256fdb982d53f011c7d5d560304eafeecebce79d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "atomic-waker" version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1283,6 +1294,7 @@ dependencies = [ name = "seance-rs" version = "0.1.0" dependencies = [ + "async-trait", "crossterm", "futures", "lru", diff --git a/Cargo.toml b/Cargo.toml index 1a7acb7..c9a5391 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1.86" crossterm = "0.28.1" lru = "0.12.3" futures = "0.3.30" diff --git a/src/config.rs b/src/config.rs index efbb50d..f9835aa 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,8 +3,6 @@ use std::collections::HashMap; use regex::{Regex, RegexBuilder}; use serde::{Deserialize, Deserializer, de::Error}; -use crate::system::UserId; - #[derive(Deserialize)] pub enum AutoProxyScope { Global, @@ -70,8 +68,6 @@ pub struct Member { #[serde(deserialize_with = "parse_regex")] pub message_pattern: Regex, pub discord_token: String, - #[serde(skip)] - pub user_id: Option<UserId>, pub presence: Option<PresenceMode>, pub status: Option<String>, } diff --git a/src/main.rs b/src/main.rs index 2058faf..c795e1c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -63,8 +63,8 @@ fn main() { join_handles.push((system_name.clone(), handle)); } - crossterm::execute!(io::stdout(), EnterAlternateScreen).unwrap(); - crossterm::execute!(io::stdout(), DisableLineWrap).unwrap(); + // crossterm::execute!(io::stdout(), EnterAlternateScreen).unwrap(); + // crossterm::execute!(io::stdout(), DisableLineWrap).unwrap(); loop { // Wait for an event from one of the threads @@ -199,8 +199,7 @@ fn spawn_system(system_name : &String, system_config: config::System, waker: mps let dup_waker = waker.clone(); thread_local_runtime.block_on(async { - let mut system = Manager::new(name.clone(), config, waker); - system.start_clients().await; + Manager::start(name.clone(), config, waker).await; }); let _ = dup_waker.send((name, SystemUiEvent::SystemClose)); diff --git a/src/system/aggregator.rs b/src/system/aggregator.rs index 52b1a8c..822c698 100644 --- a/src/system/aggregator.rs +++ b/src/system/aggregator.rs @@ -1,114 +1,97 @@ use lru::LruCache; -use std::sync::Arc; -use tokio::sync::RwLock; use std::num::NonZeroUsize; -use tokio::sync::mpsc::{channel, Receiver, Sender}; -use twilight_model::channel::Message as TwiMessage; +use tokio::sync::mpsc::{channel, Receiver}; +use twilight_model::{util::Timestamp, channel::Message, gateway::payload::incoming::MessageUpdate, id::{marker::MessageMarker, Id}}; +use crate::system::types::System; -use super::{MemberId, Message as GatewayMessage, MessageEvent, MessageId, SystemEvent}; +type DiscordToken = String; -pub struct AggregatorState { - rx: Receiver<MessageEvent>, - tx: Sender<MessageEvent>, - message_cache: lru::LruCache<MessageId, (TwiMessage, MemberId)>, - system_emitter: Option<Sender<SystemEvent>>, -} +pub struct MessageAggregator; -pub struct MessageAggregator { - state: Arc<RwLock<AggregatorState>>, +enum IncomingMessage { + Complete {message: Message, timestamp: Timestamp, seen_by: DiscordToken}, + Partial {message: MessageUpdate, timestamp: Timestamp, seen_by: DiscordToken}, } impl MessageAggregator { - pub fn new(system_size: usize) -> Self { - let buf_size = std::cmp::max(system_size * 2, 1); - let (tx, rx) = channel::<MessageEvent>(buf_size); - - Self { - state: Arc::new(RwLock::new( AggregatorState { - tx, - rx, - message_cache: LruCache::new(NonZeroUsize::new(buf_size).unwrap()), - system_emitter: None, - - })) + pub fn start(system: &System) -> Receiver<(Message, DiscordToken)> { + let incoming_channel = channel::<IncomingMessage>(32); + let outgoing_channel = channel::<(Message, DiscordToken)>(32); + + // Start incoming task for each member + for member in system.members.iter() { + let followed_user = system.followed_user.clone(); + let sender = incoming_channel.0.clone(); + let member = member.clone(); + + tokio::spawn(async move { + loop { + let next_event = {member.shard.lock().await.next_event().await}; + + match next_event { + Err(err) => println!("Error"), + Ok(twilight_gateway::Event::MessageCreate(message)) => { + if message.author.id == followed_user { + sender.send(IncomingMessage::Complete { + message: message.0.clone(), + timestamp: message.timestamp.clone(), + seen_by: member.discord_token.clone() + }).await; + } + }, + Ok(twilight_gateway::Event::MessageUpdate(message)) => { + if message.author.is_some() && {message.author.as_ref().unwrap().id == followed_user} { + sender.send(IncomingMessage::Partial { + message: (*message).clone(), + timestamp: message.edited_timestamp.unwrap_or(message.timestamp.expect("No message timestamp")), + seen_by: member.discord_token.clone() + }).await; + } + }, + _ => {} + } + } + }); } - } - - pub async fn get_sender(&self) -> Sender<MessageEvent> { - self.state.read().await.tx.clone() - } - - pub async fn set_system_handler(&mut self, emitter: Sender<SystemEvent>) -> () { - self.state.write().await.system_emitter = Some(emitter); - } - - // We probably don't actully need this since we've got a separate sent-cache by channel - // pub async fn lookup_message(&self, message_id: MessageId) -> Option<TwiMessage> { - // self.state.write().await.message_cache.get(&message_id).map(|m| m.clone()) - // } - - pub fn start(&self) -> () { - let state = self.state.clone(); + // Start deduping task + let mut message_cache = LruCache::<Id<MessageMarker>, (Message, Timestamp, DiscordToken)>::new(NonZeroUsize::new(32).unwrap()); + let mut receiver = incoming_channel.1; + let sender = outgoing_channel.0; + let system = system.clone(); tokio::spawn(async move { loop { - let system_emitter = { state.read().await.system_emitter.clone().expect("No system emitter") }; - let self_emitter = { state.read().await.tx.clone() }; - let next_event = { state.write().await.rx.recv().await }; - - - match next_event { + match receiver.recv().await { None => (), - Some((timestamp, message)) => { - match message { - GatewayMessage::Partial(current_partial, member_id) => { - let cache_content = { state.write().await.message_cache.get(¤t_partial.id).map(|m| m.clone()) }; - match cache_content { - Some((original_message, member_id)) => { - - let mut updated_message = original_message.clone(); - if let Some(edited_time) = current_partial.edited_timestamp { - updated_message.edited_timestamp = Some(edited_time); - } - - if let Some(content) = ¤t_partial.content { - updated_message.content = content.clone() - } - - self_emitter.send((timestamp, GatewayMessage::Complete(updated_message, member_id))).await; - }, - None => { - system_emitter.send( - SystemEvent::RefetchMessage(member_id, current_partial.id, current_partial.channel_id) - ).await; - }, - }; - }, - GatewayMessage::Complete(message, member_id) => { - let previous_message = { state.write().await.message_cache.get(&message.id).map(|m| m.clone()) }; - - if let Some((previous_message, _last_seen_by)) = previous_message { - let previous_timestamp = previous_message.edited_timestamp.unwrap_or(previous_message.timestamp); - let current_timestamp = message.edited_timestamp.unwrap_or(message.timestamp); - - // Should we skip sending - if previous_timestamp.as_micros() >= current_timestamp.as_micros() { - continue - } - - // If not, fall through to update stored message - } - - { state.write().await.message_cache.put(message.id, (message.clone(), member_id)); }; - - system_emitter - .send(SystemEvent::NewMessage(timestamp, message, member_id)) - .await; - }, - }; - } - } + Some(IncomingMessage::Complete { message, timestamp, seen_by }) => { + if let None = message_cache.get(&message.id) { + message_cache.put(message.id.clone(), (message.clone(), timestamp, seen_by.clone())); + sender.send((message, seen_by)).await; + } + + }, + Some(IncomingMessage::Partial { message, timestamp, seen_by }) => { + if let Some((previous_message, previous_timestamp, previously_seen_by)) = message_cache.get(&message.id) { + if previous_timestamp.as_micros() < timestamp.as_micros() { + let mut updated_message = previous_message.clone(); + updated_message.content = message.content.unwrap_or(updated_message.content); + message_cache.put(message.id.clone(), (updated_message.clone(), timestamp, seen_by.clone())); + sender.send((updated_message, seen_by)).await; + } + } else { + let client = system.members.iter().find(|m| m.discord_token == seen_by).map(|m| m.client.clone()).expect("Could not find client"); + if let Ok(updated_message) = client.lock().await.message(message.channel_id, message.id).await.unwrap().model().await.map(|r|r.clone()) { + message_cache.put(message.id.clone(), (updated_message.clone(), timestamp, seen_by.clone())); + sender.send((updated_message, seen_by)).await; + }; + } + + }, + }; } }); + + return outgoing_channel.1 + } } diff --git a/src/system/bot/client.rs b/src/system/bot/client.rs deleted file mode 100644 index ee01e6e..0000000 --- a/src/system/bot/client.rs +++ /dev/null @@ -1,237 +0,0 @@ -use std::sync::Arc; -use futures::future::join_all; -use tokio::sync::RwLock; -use tokio::sync::Mutex; -use twilight_http::client::Client as TwiClient; -use twilight_http::error::Error as TwiError; -use twilight_http::request::channel::reaction::RequestReactionType; -use twilight_model::channel::message::{AllowedMentions, MentionType, MessageType}; -use twilight_model::http::attachment::Attachment; - -use super::*; - -pub struct Client { - client: Arc<Mutex<TwiClient>>, - bot_conf: Arc<RwLock<BotConfig>>, -} - -impl Client { - pub fn new(discord_token: &String, bot_conf: &Arc<RwLock<BotConfig>>) -> Self { - Self { - client: Arc::new(Mutex::new(TwiClient::new(discord_token.clone()))), - bot_conf: bot_conf.clone(), - } - } - - pub async fn set_nick<'a>(&self, server_id: ServerId, nick: &'a str) -> Result<(), TwiError> { - let client = self.client.lock().await; - - client - .update_current_member(server_id) - .nick(Some(nick)).expect("Invalid nick") - .await?; - - Ok(()) - } - - pub async fn fetch_message(&self, message_id: MessageId, channel_id: ChannelId) -> FullMessage { - let client = self.client.lock().await; - - client - .message(channel_id, message_id) - .await - .expect("Could not load message") - .model() - .await - .expect("Could not deserialize message") - } - - pub async fn fetch_recent_channel_messages(&self, channel_id: ChannelId) -> Result<Vec<FullMessage>, TwiError> { - let client = self.client.lock().await; - - Ok(client - .channel_messages(channel_id) - .limit(10).unwrap() - .await? - .model() - .await - .unwrap()) - } - - pub async fn resend_message(&self, message_id: MessageId, channel_id: ChannelId) { - let bot_conf = self.bot_conf.read().await; - let message = self.fetch_message(message_id, channel_id).await; - let message_channel = bot_conf.message_handler.as_ref().expect("No message handler"); - - let timestamp = if message.edited_timestamp.is_some() { - message.edited_timestamp.unwrap() - } else { - message.timestamp - }; - - message_channel - .send((timestamp, Message::Complete(message, bot_conf.member_id))) - .await; - } - - pub async fn delete_message(&self, channel_id: ChannelId, message_id: MessageId) -> Result<(), TwiError> { - let client = self.client.lock().await; - let delete_result = client.delete_message(channel_id, message_id).await; - let member_id = self.bot_conf.read().await.member_id; - - drop(client); - - match delete_result { - Err(err) => { - match &err.kind() { - twilight_http::error::ErrorType::Response { body: _, error, status: _ } => match error { - twilight_http::api_error::ApiError::General(err) => { - // Code for "Missing Permissions": https://discord.com/developers/docs/topics/opcodes-and-status-codes#json-json-error-codes - if err.code == 50013 { - println!("ERROR: Client {} doesn't have permissions to delete message", member_id); - let _ = self.react_message(channel_id, message_id, &RequestReactionType::Unicode { name: "๐" }).await; - } - }, - _ => (), - }, - _ => (), - }; - - Err(err) - }, - _ => Ok(()), - } - } - - pub async fn react_message(&self, channel_id: ChannelId, message_id: MessageId, react: &'_ RequestReactionType<'_>) -> Result<(), TwiError> { - let _ = self.client.lock().await.create_reaction( - channel_id, - message_id, - react - ).await; - - return Ok(()) - } - - pub async fn edit_message(&self, channel_id: ChannelId, message_id: MessageId, new_content: String) -> Result<FullMessage, TwiError> { - Ok(self.client.lock().await.update_message(channel_id, message_id) - .content(Some(new_content.as_str())).expect("Invalid message contents") - .await.expect("Could not update message") - .model().await.unwrap()) - } - - pub async fn duplicate_message(&self, message: &TwiMessage, content: &str) -> Result<TwiMessage, MessageDuplicateError> { - let client = self.client.lock().await; - - let mut create_message = client.create_message(message.channel_id).content(content)?; - - let mut allowed_mentions = AllowedMentions { - parse: Vec::new(), - replied_user: false, - roles: message.mention_roles.clone(), - users: message.mentions.iter().map(|user| user.id).collect(), - }; - - if message.mention_everyone { - allowed_mentions.parse.push(MentionType::Everyone); - } - - if message.kind == MessageType::Reply { - if let Some(ref_message) = message.referenced_message.as_ref() { - create_message = create_message.reply(ref_message.id); - - let pings_referenced_author = message - .mentions - .iter() - .any(|user| user.id == ref_message.author.id); - - if pings_referenced_author { - allowed_mentions.replied_user = true; - } else { - allowed_mentions.replied_user = false; - } - } else { - panic!("Cannot proxy message: Was reply but no referenced message"); - } - } - - let attachments = join_all(message.attachments.iter().map(|attachment| async { - let filename = attachment.filename.clone(); - let description_opt = attachment.description.clone(); - let bytes = reqwest::get(attachment.proxy_url.clone()) - .await? - .bytes() - .await?; - let mut new_attachment = - Attachment::from_bytes(filename, bytes.try_into().unwrap(), attachment.id.into()); - - if let Some(description) = description_opt { - new_attachment.description(description); - } - - Ok(new_attachment) - })) - .await - .iter() - .filter_map( - |result: &Result<Attachment, MessageDuplicateError>| match result { - Ok(attachment) => Some(attachment.clone()), - Err(_) => None, - }, - ) - .collect::<Vec<_>>(); - - if attachments.len() > 0 { - create_message = create_message.attachments(attachments.as_slice())?; - } - - if let Some(flags) = message.flags { - create_message = create_message.flags(flags); - } - - create_message = create_message.allowed_mentions(Some(&allowed_mentions)); - let new_message = create_message.await?.model().await?; - - Ok(new_message) - } - - pub async fn leave_server(&self, server_id: ServerId) -> Result<(), TwiError> { - self.client.lock().await.leave_guild( - server_id, - ).await?; - - return Ok(()) - } -} - -#[derive(Debug)] -pub enum MessageDuplicateError { - MessageValidation(twilight_validate::message::MessageValidationError), - AttachmentRequest(reqwest::Error), - MessageCreate(twilight_http::error::Error), - ResponseDeserialization(twilight_http::response::DeserializeBodyError), -} - -impl From<twilight_validate::message::MessageValidationError> for MessageDuplicateError { - fn from(value: twilight_validate::message::MessageValidationError) -> Self { - MessageDuplicateError::MessageValidation(value) - } -} - -impl From<reqwest::Error> for MessageDuplicateError { - fn from(value: reqwest::Error) -> Self { - MessageDuplicateError::AttachmentRequest(value) - } -} - -impl From<twilight_http::error::Error> for MessageDuplicateError { - fn from(value: twilight_http::error::Error) -> Self { - MessageDuplicateError::MessageCreate(value) - } -} - -impl From<twilight_http::response::DeserializeBodyError> for MessageDuplicateError { - fn from(value: twilight_http::response::DeserializeBodyError) -> Self { - MessageDuplicateError::ResponseDeserialization(value) - } -} diff --git a/src/system/bot/gateway.rs b/src/system/bot/gateway.rs deleted file mode 100644 index bfe4603..0000000 --- a/src/system/bot/gateway.rs +++ /dev/null @@ -1,123 +0,0 @@ -use std::sync::Arc; -use tokio::sync::RwLock; -use tokio::sync::Mutex; -use twilight_model::gateway::OpCode; -use twilight_model::gateway::payload::outgoing::{update_presence::UpdatePresencePayload, UpdatePresence}; -use twilight_gateway::{ - Intents, Shard, ShardId, -}; - -use super::{Message, Status, SystemEvent, BotConfig}; - -pub struct Gateway { - shard: Arc<Mutex<Shard>>, - bot_conf: Arc<RwLock<BotConfig>>, -} - -impl Gateway { - pub fn new(discord_token: &String, bot_conf: &Arc<RwLock<BotConfig>>) -> Self { - let intents = Intents::GUILD_MEMBERS - | Intents::GUILD_PRESENCES - | Intents::GUILD_MESSAGES - | Intents::MESSAGE_CONTENT; - - Self { - shard: Arc::new(Mutex::new(Shard::new( - ShardId::ONE, - discord_token.clone(), - intents, - ))), - bot_conf: bot_conf.clone(), - } - } - - pub async fn set_status(&self, status: Status) { - { - let last_status = { (*self.bot_conf.read().await).last_status }; - - if status == last_status { - return - } - } - - - { - let mut shard = self.shard.lock().await; - - shard.command(&UpdatePresence { - d: UpdatePresencePayload { - activities: Vec::new(), - afk: false, - since: None, - status, - }, - op: OpCode::PresenceUpdate, - }).await.expect("Could not send command to gateway"); - } - - self.bot_conf.write().await.last_status = status; - } - - pub fn start_listening(&self) { - let bot_conf = self.bot_conf.clone(); - let shard = self.shard.clone(); - tokio::spawn(async move { - loop { - let bot_conf = { (*bot_conf.read().await).clone() }; - let next_event = { shard.lock().await.next_event().await }; - let system_channel = bot_conf.system_handler.as_ref().expect("No system channel"); - let message_channel = bot_conf.message_handler.as_ref().expect("No message channel"); - - match next_event { - Err(source) => { - system_channel - .send(SystemEvent::GatewayError(bot_conf.member_id, source.to_string())) - .await; - - if source.is_fatal() { - system_channel.send(SystemEvent::GatewayClosed(bot_conf.member_id)).await; - return; - } - } - Ok(event) => match event { - twilight_gateway::Event::Ready(ready) => { - system_channel - .send(SystemEvent::GatewayConnected(bot_conf.member_id, ready.user.id)) - .await; - } - - twilight_gateway::Event::MessageCreate(message_create) => { - let message = message_create.0; - - if message.author.id != bot_conf.reference_user_id { - continue; - } - - message_channel - .send((message.timestamp, Message::Complete(message, bot_conf.member_id))) - .await; - } - - twilight_gateway::Event::MessageUpdate(message_update) => { - if message_update.author.is_none() - || message_update.author.as_ref().unwrap().id != bot_conf.reference_user_id - { - continue; - } - - if message_update.edited_timestamp.is_none() || message_update.content.is_none() { - continue; - } - - message_channel - .send((message_update.edited_timestamp.unwrap(), Message::Partial(*message_update, bot_conf.member_id))) - .await; - } - - _ => (), - }, - }; - } - }); - } -} diff --git a/src/system/bot/mod.rs b/src/system/bot/mod.rs deleted file mode 100644 index d55562d..0000000 --- a/src/system/bot/mod.rs +++ /dev/null @@ -1,105 +0,0 @@ -mod client; -mod gateway; - -use std::sync::Arc; -use tokio::sync::mpsc::Sender; -use tokio::sync::RwLock; -use twilight_http::error::Error as TwiError; -use twilight_http::request::channel::reaction::RequestReactionType; - -pub use super::types::*; -pub use client::MessageDuplicateError; -use gateway::Gateway; -use client::Client; - -#[derive(Clone)] -pub struct BotConfig { - pub member_id: MemberId, - pub reference_user_id: UserId, - pub discord_token: String, - pub last_status: Status, - pub message_handler: Option<Sender<MessageEvent>>, - pub system_handler: Option<Sender<SystemEvent>>, -} - -pub struct Bot { - bot_conf: Arc<RwLock<BotConfig>>, - gateway: Gateway, - client: Client, -} - -impl Bot { - pub fn new( - member_id: MemberId, - config: &crate::config::Member, - reference_user_id: UserId, - ) -> Self { - let bot_conf = Arc::new(RwLock::new(BotConfig { - member_id, - reference_user_id, - discord_token: config.discord_token.clone(), - last_status: Status::Online, - message_handler: None, - system_handler: None, - })); - - Self { - gateway: Gateway::new(&config.discord_token, &bot_conf), - client: Client::new(&config.discord_token, &bot_conf), - bot_conf, - } - } - - pub async fn set_message_handler(&mut self, handler: Sender<MessageEvent>) { - self.bot_conf.write().await.message_handler = Some(handler); - } - - pub async fn set_system_handler(&mut self, handler: Sender<SystemEvent>) { - self.bot_conf.write().await.system_handler = Some(handler); - } - - pub async fn set_status(&self, status: Status) { - self.gateway.set_status(status).await; - } - - pub async fn set_nick(&self, server_id: ServerId, nick: String) { - self.client.set_nick(server_id, nick.as_str()).await.expect("Could not update nick") - } - - pub fn start(&self) { - self.gateway.start_listening() - } - - pub async fn fetch_message(&self, message_id: MessageId, channel_id: ChannelId) -> TwiMessage { - self.client.fetch_message(message_id, channel_id).await - } - - pub async fn fetch_recent_channel_messages(&self, channel_id: ChannelId) -> Result<Vec<FullMessage>, TwiError> { - self.client.fetch_recent_channel_messages(channel_id).await - } - - pub async fn resend_message(&self, message_id: MessageId, channel_id: ChannelId) { - self.client.resend_message(message_id, channel_id).await; - } - - pub async fn edit_message(&self, channel_id: ChannelId, message_id: MessageId, new_content: String) -> Result<FullMessage, TwiError> { - self.client.edit_message(channel_id, message_id, new_content).await - } - - pub async fn delete_message(&self, channel_id: ChannelId, message_id: MessageId) -> Result<(), TwiError> { - self.client.delete_message(channel_id, message_id).await - } - - pub async fn react_message(&self, channel_id: ChannelId, message_id: MessageId, react: &'_ RequestReactionType<'_>) -> Result<(), TwiError> { - self.client.react_message(channel_id, message_id, react).await - } - - pub async fn duplicate_message(&self, message_id: &TwiMessage, content: &str) -> Result<TwiMessage, MessageDuplicateError> { - self.client.duplicate_message(message_id, content).await - } - - pub async fn leave_server(&self, server_id: ServerId) -> Result<(), TwiError> { - self.client.leave_server(server_id).await - } -} - diff --git a/src/system/message_parser.rs b/src/system/message_parser.rs deleted file mode 100644 index 7845fbf..0000000 --- a/src/system/message_parser.rs +++ /dev/null @@ -1,238 +0,0 @@ -use std::sync::LazyLock; -use regex::{Regex, RegexBuilder}; - -use crate::config::System; - -use twilight_mention::ParseMention; -use twilight_model::id::{marker::UserMarker, Id}; -use super::{FullMessage, MemberId, MessageId, ServerId, Timestamp, UserId}; - -pub enum ParsedMessage { - Command(Command), - SetProxyAndDelete(MemberId), - ProxiedMessage { - member_id: MemberId, - message_content: String, - latch: bool, - }, - UnproxiedMessage(Option<String>), - LatchClear(MemberId), - - // TODO: Figure out how to represent emotes - EmoteAdd(MemberId, MessageId, ()), - EmoteRemove(MemberId, MessageId, ()), -} - -pub enum Command { - Part(ServerId), - Edit(MemberId, MessageId, String), - Reproxy(MemberId, MessageId), - Delete(MessageId), - Nick(MemberId, String), - Log(String), - ReloadSystemConfig, - ExitSรฉance, - UnknownCommand, - InvalidCommand -} - -pub struct MessageParser {} - -static CORRECTION_REGEX: LazyLock<Regex> = LazyLock::new(|| { - Regex::new(r"^\*\B+$").unwrap() -}); - -impl MessageParser { - pub fn parse(message: &FullMessage, secondary_message: Option<&FullMessage>, system_config: &System, latch_state: Option<(MemberId, Timestamp)>) -> ParsedMessage { - if message.content == r"\\" { - return ParsedMessage::LatchClear(if let Some((member_id, _)) = latch_state { - member_id - } else { - 0 - }) - } - - if message.content.starts_with(r"\") { - return ParsedMessage::UnproxiedMessage(None) - } - - if message.content.starts_with(r"!") { - if let Some(parse) = MessageParser::check_command(message, secondary_message, system_config, latch_state) { - return ParsedMessage::Command(parse); - } else { - return ParsedMessage::UnproxiedMessage(Some(format!("Unknown command string: {}", message.content))); - } - } - - if CORRECTION_REGEX.is_match(message.content.as_str()) { - if let Some(parse) = MessageParser::check_correction(message, secondary_message) { - return parse - } - } - - if let Some(parse) = MessageParser::check_member_patterns(message, secondary_message, system_config) { - return parse - } - - if let Some(parse) = MessageParser::check_autoproxy(message, latch_state) { - return parse - } - - // If nothing else - ParsedMessage::UnproxiedMessage(None) - } - - fn check_command(message: &FullMessage, secondary_message: Option<&FullMessage>, system_config: &System, latch_state: Option<(MemberId, Timestamp)>) -> Option<Command> { - let mut words = message.content.strip_prefix("!").unwrap().split_whitespace(); - let first_word = words.next(); - - match first_word { - None => return None, - Some(command_name) => match command_name { - "log" => { - let remainder = words.remainder().unwrap().to_string(); - return Some(Command::Log(remainder)); - }, - "edit" => { - let editing_member = Self::get_member_id_from_user_id(secondary_message.as_ref().unwrap().author.id, system_config).unwrap(); - return Some(Command::Edit(editing_member, secondary_message.unwrap().id, words.remainder().unwrap().to_string())); - }, - "nick" => { - if let Some(member) = MessageParser::match_member(words.next(), system_config) { - return Some(Command::Nick(member, words.remainder().unwrap().to_string())); - } - }, - "reproxy" => { - if let Some(member) = MessageParser::match_member(words.next(), system_config) { - return Some(Command::Reproxy(member, secondary_message.unwrap().id)); - } - }, - "delete" => { - return Some(Command::Delete(secondary_message.unwrap().id)); - }, - "part" => { - return Some(Command::Part(message.guild_id.unwrap())); - }, - _ => (), - }, - } - - // Attempt matching !s - if message.content.chars().nth(1).unwrap() == 's' { - let separator = message.content.chars().nth(2).unwrap(); - let parts: Vec<&str> = message.content.split(separator).collect(); - - if parts.len() != 3 && parts.len() != 4 { - return None - } - - let pattern = parts.get(1).unwrap(); - let replacement = parts.get(2).unwrap(); - let flags = parts.get(3).unwrap_or(&""); - - let mut global = false; - let mut regex = RegexBuilder::new(pattern); - - for flag in flags.chars() {match flag { - 'i' => {regex.case_insensitive(true);}, - 'm' => {regex.multi_line(true);}, - 'g' => {global = true;}, - 'x' => {regex.ignore_whitespace(true);}, - 'R' => {regex.crlf(true);}, - 's' => {regex.dot_matches_new_line(true);}, - 'U' => {regex.swap_greed(true);}, - _ => {return None;}, - }}; - - let valid_regex = regex.build(); - let original_content = &secondary_message.as_ref().unwrap().content; - - // If the regex parses, replace with that - let new_content = if let Ok(regex) = valid_regex { - if global { - regex.replace_all(original_content.as_str(), *replacement).to_string() - } else { - regex.replace(original_content.as_str(), *replacement).to_string() - } - - // Else attempt replace as string - } else { - original_content.replace(pattern, replacement) - }; - - let editing_member = Self::get_member_id_from_user_id(secondary_message.as_ref().unwrap().author.id, system_config).unwrap(); - return Some(Command::Edit(editing_member, secondary_message.as_ref().unwrap().id, new_content.to_string())); - } - - // If unable to parse - None - } - - fn check_correction(message: &FullMessage, secondary_message: Option<&FullMessage>) -> Option<ParsedMessage> { - None - } - - fn check_member_patterns(message: &FullMessage, secondary_message: Option<&FullMessage>, system_config: &System) -> Option<ParsedMessage> { - let matches_prefix = system_config.members.iter().enumerate().find_map(|(member_id, member)| - Some((member_id, member.matches_proxy_prefix(&message)?)) - ); - - if let Some((member_id, matched_content)) = matches_prefix { - if matched_content.trim() == "*" { - Some(ParsedMessage::Command(Command::Reproxy(member_id, secondary_message.unwrap().id))) - } else if matched_content.trim() != "" { - Some(ParsedMessage::ProxiedMessage { - member_id, - message_content: matched_content.to_string(), - latch: true, - }) - } else { - Some(ParsedMessage::SetProxyAndDelete(member_id)) - } - } else { - None - } - } - - fn check_autoproxy(message: &FullMessage, latch_state: Option<(MemberId, Timestamp)>) -> Option<ParsedMessage> { - if let Some((member_id, _)) = latch_state { - Some(ParsedMessage::ProxiedMessage { - member_id, - message_content: message.content.clone(), - latch: true, - }) - } else { - None - } - } - - fn match_member(maybe_mention: Option<&str>, system_config: &System) -> Option<MemberId> { - if let Some(maybe_mention) = maybe_mention { - if let Ok(mention) = Id::<UserMarker>::parse(maybe_mention) { - return MessageParser::get_member_id_from_user_id(mention, system_config) - } - } - - None - } - - pub fn get_member_id_from_user_id(user_id: UserId, system_config: &System) -> Option<MemberId> { - system_config.members.iter().enumerate() - .filter(|(_id, m)| m.user_id.is_some()) - .find(|(_id, m)| m.user_id.unwrap() == user_id) - .map(|(id, _m)| id) - } -} - -impl crate::config::Member { - pub fn matches_proxy_prefix<'a>(&self, message: &'a FullMessage) -> Option<&'a str> { - match self.message_pattern.captures(message.content.as_str()) { - None => None, - Some(captures) => match captures.name("content") { - None => None, - Some(matched_content) => Some(matched_content.as_str()), - }, - } - } -} - diff --git a/src/system/mod.rs b/src/system/mod.rs index 0cc6463..77133f8 100644 --- a/src/system/mod.rs +++ b/src/system/mod.rs @@ -1,490 +1,95 @@ -use std::{collections::HashMap, num::NonZeroUsize, str::FromStr, time::Duration}; - -use std::sync::mpsc::Sender as ThreadSender; -use lru::LruCache; -use tokio::{ - sync::mpsc::{channel, Sender}, - time::sleep, -}; -use twilight_http::request::channel::reaction::RequestReactionType; -use twilight_model::{channel::message::{MessageReference, MessageType, ReactionType}, id::{marker::UserMarker, Id}}; -use twilight_model::util::Timestamp; - -use crate::config::{AutoproxyConfig, AutoproxyLatchScope, Member}; -use crate::SystemUiEvent; - mod aggregator; -mod bot; mod types; -mod message_parser; - -use message_parser::MessageParser; -use aggregator::MessageAggregator; -use bot::Bot; -pub use types::*; - -use self::message_parser::{Command, ParsedMessage}; +mod plugin; +mod util; +use twilight_gateway::{Intents, Shard, ShardId}; +use twilight_http::Client; +use twilight_model::channel::Message; +pub use types::SystemThreadCommand; +use crate::SystemUiEvent; +use std::{num::NonZeroU64, sync::Arc}; +use tokio::sync::Mutex; +use plugin::SeancePlugin; -pub struct Manager { - pub name: String, - pub config: crate::config::System, - pub bots: HashMap<MemberId, Bot>, - pub latch_state: Option<(MemberId, Timestamp)>, - pub system_sender: Option<Sender<SystemEvent>>, - pub aggregator: MessageAggregator, - pub send_cache: LruCache<ChannelId, TwiMessage>, - pub reference_user_id: UserId, - pub ui_sender: ThreadSender<(String, SystemUiEvent)>, -} +use std::sync::mpsc::Sender as ThreadSender; +use types::{Member, Response, System}; +pub struct Manager; impl Manager { - pub fn new(system_name: String, system_config: crate::config::System, ui_sender : ThreadSender<(String, SystemUiEvent)>) -> Self { - Self { - reference_user_id: Id::from_str(&system_config.reference_user_id.as_str()) - .expect(format!("Invalid user id for system {}", &system_name).as_str()), - aggregator: MessageAggregator::new(system_config.members.len()), - name: system_name, - config: system_config, - bots: HashMap::new(), - latch_state: None, - system_sender: None, - send_cache: LruCache::new(NonZeroUsize::new(15).unwrap()), - ui_sender, - } - } - - pub fn find_member_by_name<'a>( - &'a self, - name: &String, - ) -> Option<(MemberId, &'a crate::config::Member)> { - self.config - .members - .iter() - .enumerate() - .find(|(_member_id, member)| member.name == *name) - } - - pub fn find_member_by_id<'a>(&'a self, id: MemberId) -> Option<&'a Member> { - self.config - .members - .iter() - .enumerate() - .find(|(member_id, _)| *member_id == id) - .map_or(None, |(_member_id, member)| Some(member)) - } - - pub async fn start_clients(&mut self) { - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("Starting clients for system {}", self.name) - ))); - - let (system_sender, mut system_receiver) = channel::<SystemEvent>(100); - self.system_sender = Some(system_sender.clone()); - self.aggregator.set_system_handler(system_sender.clone()).await; - self.aggregator.start(); + pub async fn start(system_name: String, system_config: crate::config::System, ui_sender : ThreadSender<(String, SystemUiEvent)>) { + let gateway_intents = Intents::GUILD_MEMBERS | Intents::GUILD_PRESENCES | Intents::GUILD_MESSAGES | Intents::MESSAGE_CONTENT; + let system = System { + followed_user: NonZeroU64::try_from(system_config.reference_user_id.parse::<u64>().unwrap()).unwrap().into(), + command_prefix: "!".to_string(), + members: system_config.members.iter().map(|member| Member { + discord_token: member.discord_token.clone(), + message_pattern: member.message_pattern.clone(), + shard: Arc::new(Mutex::new(Shard::new( + ShardId::ONE, + member.discord_token.clone(), + gateway_intents.clone(), + ))), + client: Arc::new(Mutex::new(Client::new(member.discord_token.clone()))) + }).collect(), + }; - for member_id in 0..self.config.members.len() { - self.start_bot(member_id).await; - } + let mut message_receiver = aggregator::MessageAggregator::start(&system); - if self.config.members.len() < 1 { - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("WARNING: System {} has no configured members", &self.name) - ))); - } + let mut plugins : Vec<Box<dyn SeancePlugin>> = vec![ + Box::new(plugin::ProxyPrefixes), + Box::new(plugin::Autoproxy::new()), + ]; loop { - match system_receiver.recv().await { - Some(SystemEvent::GatewayConnected(member_id, user_id)) => { - self.config.members.iter_mut().enumerate() - .find(|(id, _)| *id == member_id).unwrap().1.user_id = Some(user_id); - - let member = self.find_member_by_id(member_id).unwrap(); - - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::GatewayConnect(member.name.clone()))); - - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("Gateway client {} ({}) connected", member.name, member_id) - ))); - } - - Some(SystemEvent::GatewayError(member_id, message)) => { - let member = self.find_member_by_id(member_id).unwrap(); - - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("Gateway client {} ran into error {}", member.name, message) - ))); - } - - Some(SystemEvent::GatewayClosed(member_id)) => { - let member = self.find_member_by_id(member_id).unwrap(); - - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::GatewayDisconnect(member.name.clone()))); - - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("Gateway client {} closed", member.name) - ))); - - self.start_bot(member_id).await; - } - - Some(SystemEvent::NewMessage(event_time, message, member_id)) => { - self.handle_message(message, event_time, member_id).await; - } - - Some(SystemEvent::RefetchMessage(member_id, message_id, channel_id)) => { - let bot = self.bots.get(&member_id).unwrap(); - bot.resend_message(message_id, channel_id).await; - } - - Some(SystemEvent::AutoproxyTimeout(time_scheduled)) => { - if let Some((_member, current_last_message)) = self.latch_state.clone() { - if current_last_message == time_scheduled { - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::MemberAutoproxy(None))); - - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("Autoproxy timeout has expired: {} (last sent), {} (timeout scheduled)", current_last_message.as_secs(), time_scheduled.as_secs()) - ))); - self.latch_state = None; - self.update_status_of_system().await; + match message_receiver.recv().await { + None => (), + Some((message, seen_by)) => { + println!("Checking message: {}", message.content.clone()); + if message.content.starts_with(&system.command_prefix) { + for plugin in &mut plugins { + match plugin.handle_command(&system, &message).await { + plugin::CommandOutcome::Skipped => continue, + plugin::CommandOutcome::Handled => break, + plugin::CommandOutcome::Errored {message} => { + println!("Error: {message}"); + break + }, + } } - } - }, - - Some(SystemEvent::UpdateClientStatus(member_id)) => { - let bot = self.bots.get(&member_id).unwrap(); - - // TODO: handle other presence modes - if let Some((latched_id, _)) = self.latch_state { - if latched_id == member_id { - bot.set_status(Status::Online).await; - continue + } else { + let mut message_response = Response::Noop { delete_source: false }; + for plugin in &plugins { + plugin.handle_message(&system, &message, &mut message_response).await; } - } - - bot.set_status(Status::Invisible).await; - } - - _ => continue, - } - } - } - - async fn start_bot(&mut self, member_id: MemberId) { - let member = self.find_member_by_id(member_id).unwrap(); - - // Create gateway listener - let mut bot = Bot::new(member_id, &member, self.reference_user_id); - - bot.set_message_handler(self.aggregator.get_sender().await).await; - bot.set_system_handler(self.system_sender.as_ref().unwrap().clone()).await; - - // Start gateway listener - bot.start(); - self.bots.insert(member_id, bot); - - // Schedule status update after a few seconds - let rx = self.system_sender.as_ref().unwrap().clone(); - tokio::spawn(async move { - sleep(Duration::from_secs(10)).await; - let _ = rx.send(SystemEvent::UpdateClientStatus(member_id)).await; - }); - } - - async fn handle_message(&mut self, message: TwiMessage, timestamp: Timestamp, seen_by: MemberId) { - let bot = self.bots.get(&seen_by).expect("No client for member"); - - // If message type is reply, use that - let referenced_message = if let MessageType::Reply = message.kind { - message.referenced_message.as_ref().map(|message| message.as_ref()) - } else { - // Otherwise, check cache for lest message sent in channel - if self.send_cache.contains(&message.channel_id) { - self.send_cache.get(&message.channel_id) - } else { - // Or look it up if it's not in cache - let system_bot_ids : Vec<UserId> = self.config.members.iter().filter_map(|m| m.user_id).collect(); - let recent_messages = bot.fetch_recent_channel_messages(message.channel_id).await; - - let last_in_channel = recent_messages.map(|messages| { - messages.into_iter().filter(|message| - system_bot_ids.contains(&message.author.id) - ).max_by_key(|message| message.timestamp.as_micros()) - }).ok().flatten(); - - // Since we did all this work to look it up, insert it into cache - if let Some(last) = last_in_channel { - self.send_cache.put(message.channel_id, last); - } else { - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("WARNING: Could not look up most recent message in channel {}", message.channel_id) - ))); - }; - - // Return the message referenced from cache so there's no unnecessary clone - self.send_cache.get(&message.channel_id) - } - }; - - let parsed_message = MessageParser::parse(&message, referenced_message, &self.config, self.latch_state); - match parsed_message { - message_parser::ParsedMessage::UnproxiedMessage(log_string) => if let Some(log_string) = log_string { - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("Parse error: {log_string}") - ))); - }, + match message_response.clone() { + Response::Noop { delete_source } => { + if delete_source { + let client = system.members.iter().find(|m| m.discord_token == seen_by).map(|m| m.client.clone()) + .expect("No such client"); - message_parser::ParsedMessage::LatchClear(member_id) => { - let _ = self.bots.get(&member_id).unwrap().delete_message(message.channel_id, message.id).await; - self.latch_state = None; - self.update_status_of_system().await; - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::MemberAutoproxy(None))); - }, - - message_parser::ParsedMessage::SetProxyAndDelete(member_id) => { - let _ = self.bots.get(&member_id).unwrap().delete_message(message.channel_id, message.id).await; - self.update_autoproxy_state_after_message(member_id, message.timestamp); - self.update_status_of_system().await; - } - - message_parser::ParsedMessage::ProxiedMessage { member_id, message_content, latch } => { - if let Ok(_) = self.proxy_message(&message, member_id, message_content.as_str()).await { - if latch { - self.update_autoproxy_state_after_message(member_id, timestamp); - self.update_status_of_system().await; - } - } - }, - - message_parser::ParsedMessage::Command(Command::Edit(member_id, message_id, new_content)) => { - let bot = self.bots.get(&member_id).unwrap(); - - let author = MessageParser::get_member_id_from_user_id(referenced_message.unwrap().author.id, &self.config); - if author.is_none() { - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("Cannot edit another user's message") - ))); - let _ = self.bots.get(&member_id).unwrap().react_message(message.channel_id, message.id, &RequestReactionType::Unicode { name: "๐" }).await; - return - } - - if let Ok(new_message) = bot.edit_message(message.channel_id, message_id, new_content).await { - - // If we just edited the most recently sent message in this channel, update - // cache for future edit commands - if self.send_cache.get(&new_message.channel_id).map_or(MessageId::new(1u64), |m| m.id) == message_id { - self.send_cache.put(new_message.channel_id, new_message); - } - - // Delete the command message - let _ = bot.delete_message(message.channel_id, message.id).await; - } - } - - message_parser::ParsedMessage::Command(Command::Reproxy(member_id, message_id)) => { - if !referenced_message.map(|message| message.id == message_id).unwrap_or(false) { - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("ERROR: Attempted reproxy on message other than referenced_message") - ))); - let _ = self.bots.get(&member_id).unwrap().react_message(message.channel_id, message.id, &RequestReactionType::Unicode { name: "โ๏ธ" }).await; - return - } - - let author = MessageParser::get_member_id_from_user_id(referenced_message.unwrap().author.id, &self.config); - if author.is_none() { - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("Cannot reproxy another user's message") - ))); - let _ = self.bots.get(&member_id).unwrap().react_message(message.channel_id, message.id, &RequestReactionType::Unicode { name: "๐" }).await; - return - } - - if author.unwrap() != member_id { - // TODO: Don't allow this if other messages have been sent maybe? - let orig = referenced_message.unwrap().clone(); - if let Ok(_) = self.proxy_message(&orig, member_id, orig.content.as_str()).await { - self.update_autoproxy_state_after_message(member_id, timestamp); - self.update_status_of_system().await; - } - } else { - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("Not reproxying under same user") - ))); - } - - let bot = self.bots.get(&member_id).unwrap(); - let _ = bot.delete_message(message.channel_id, message.id).await; - } - - message_parser::ParsedMessage::Command(Command::Delete(message_id)) => { - let member_id = self.latch_state.map(|(id,_)| id).unwrap_or(0); - - let author = MessageParser::get_member_id_from_user_id(referenced_message.unwrap().author.id, &self.config); - if author.is_none() { - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("Cannot delete another user's message") - ))); - let _ = self.bots.get(&member_id).unwrap().react_message(message.channel_id, message.id, &RequestReactionType::Unicode { name: "๐" }).await; - return - } - - let bot = self.bots.get(&member_id).unwrap(); - let _ = bot.delete_message(message.channel_id, message_id).await; - let _ = bot.delete_message(message.channel_id, message.id).await; - } - - message_parser::ParsedMessage::Command(Command::Log(log_string)) => { - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("Log: {log_string}") - ))); - } - - message_parser::ParsedMessage::Command(Command::Nick(member_id, nick)) => { - let bot = self.bots.get(&member_id).unwrap(); - let server_id = message.guild_id.expect("Message has no guild"); - - bot.set_nick(server_id, nick).await; - let _ = bot.delete_message(message.channel_id, message.id).await; - } - - message_parser::ParsedMessage::Command(Command::UnknownCommand) => { - let member_id = if let Some((member_id, _)) = self.latch_state { - member_id - } else { - 0 - }; - - let _ = self.bots.get(&member_id).unwrap().react_message(message.channel_id, message.id, &RequestReactionType::Unicode { name: "โ๏ธ" }).await; - }, - message_parser::ParsedMessage::Command(Command::Part(server_id)) => { - for bot in self.bots.values() { - let _ = bot.react_message(message.channel_id, message.id, &RequestReactionType::Unicode { name: "๐" }).await; - let _ = bot.leave_server(server_id).await; - } - }, - message_parser::ParsedMessage::Command(_) => todo!(), - message_parser::ParsedMessage::EmoteAdd(_, _, _) => todo!(), - message_parser::ParsedMessage::EmoteRemove(_, _, _) => todo!(), - } - } - - async fn proxy_message(&mut self, message: &TwiMessage, member: MemberId, content: &str) -> Result<(), ()> { - let bot = self.bots.get(&member).expect("No client for member"); - - let duplicate_result = bot.duplicate_message(message, content).await; - - if duplicate_result.is_err() { - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("Could not copy message: {:?}", duplicate_result) - ))); - return Err(()) - } - - // Try to delete message first as that fails more often - let delete_result = bot.delete_message(message.channel_id, message.id).await; - - if delete_result.is_err() { - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::LogLine( - format!("Could not delete message: {:?}", delete_result) - ))); - - // Delete the duplicated message if that failed - let _ = bot.delete_message(message.channel_id, duplicate_result.unwrap().id).await; - return Err(()) - } - - // Sent successfully, add to send cache - let sent_message = duplicate_result.unwrap(); - self.send_cache.put(sent_message.channel_id, sent_message); - - Ok(()) - } - - fn update_autoproxy_state_after_message(&mut self, member: MemberId, timestamp: Timestamp) { - match &self.config.autoproxy { - None => (), - Some(AutoproxyConfig::Member { name: _ }) => (), - Some(AutoproxyConfig::Latch { - scope, - timeout_seconds, - presence_indicator: _, - }) => { - self.latch_state = Some((member, timestamp)); - - let member = self.find_member_by_id(member).unwrap(); - let _ = self.ui_sender.send((self.name.clone(), SystemUiEvent::MemberAutoproxy(Some(member.name.clone())))); - - if let Some(channel) = self.system_sender.clone() { - let last_message = timestamp.clone(); - let timeout_seconds = timeout_seconds.clone(); - - tokio::spawn(async move { - sleep(Duration::from_secs(timeout_seconds.into())).await; - channel - .send(SystemEvent::AutoproxyTimeout(last_message)) - .await - .expect("Channel has closed"); - }); - } - } - } - } + let _ = client.lock().await.delete_message(message.channel_id, message.id) + .await; + } + }, + Response::Proxy { member, content } => { + if let Ok(new_message) = util::duplicate_message(&member.client, &message, content.as_str()).await { + if let Err(err) = member.client.lock().await.delete_message(message.channel_id, message.id).await { + println!("Error proxying message: {err}"); + let _ = member.client.lock().await.delete_message(new_message.channel_id, new_message.id); + } - async fn update_status_of_system(&mut self) { - let member_states: Vec<(MemberId, Status)> = self - .config - .members - .iter() - .enumerate() - .map(|(member_id, member)| { - ( - member_id, - match &self.config.autoproxy { - None => Status::Invisible, - Some(AutoproxyConfig::Member { name }) => { - if member.name == *name { - Status::Online - } else { - Status::Invisible - } - } - Some(AutoproxyConfig::Latch { - scope, - timeout_seconds: _, - presence_indicator, - }) => { - if let AutoproxyLatchScope::Server = scope { - Status::Invisible - } else if !presence_indicator { - Status::Invisible - } else { - match &self.latch_state { - Some((latch_member, _last_timestamp)) => { - if member_id == *latch_member { - Status::Online - } else { - Status::Invisible - } + for plugin in &plugins { + plugin.post_response(&system, &new_message, message.channel_id, &message_response).await; } - None => Status::Invisible, } - } + }, } - }, - ) - }) - .collect(); - - for (member, status) in member_states { - self.update_status_of_member(member, status).await; + } + }, + } } } - - async fn update_status_of_member(&mut self, member: MemberId, status: Status) { - let bot = self.bots.get(&member).expect("No client for member"); - bot.set_status(status).await; - } } diff --git a/src/system/plugin.rs b/src/system/plugin.rs new file mode 100644 index 0000000..c458fd4 --- /dev/null +++ b/src/system/plugin.rs @@ -0,0 +1,25 @@ +mod autoproxy; +mod prefixes; + +use async_trait::async_trait; + +use twilight_model::{channel::{Channel, Message}, id::{marker::ChannelMarker, Id}}; +use crate::system::types::{System, Response}; + +pub use prefixes::ProxyPrefixes; +pub use autoproxy::Autoproxy; + +#[async_trait] +pub trait SeancePlugin { + async fn handle_command(&self, system: &System, message: &Message) -> CommandOutcome; + + async fn handle_message(&self, system: &System, message: &Message, response: &mut Response); + + async fn post_response(&self, system: &System, message: &Message, channel: Id<ChannelMarker>, response: &Response); +} + +pub enum CommandOutcome { + Skipped, + Handled, + Errored {message: String}, +} diff --git a/src/system/plugin/autoproxy.rs b/src/system/plugin/autoproxy.rs new file mode 100644 index 0000000..cd24e0b --- /dev/null +++ b/src/system/plugin/autoproxy.rs @@ -0,0 +1,143 @@ +use async_trait::async_trait; +use std::sync::Arc; +use tokio::sync::Mutex; +use twilight_model::{channel::{Channel, Message}, id::{marker::ChannelMarker, Id}, util::Timestamp}; +use crate::system::types::{System, Member, Response}; +use super::{CommandOutcome, SeancePlugin}; +use tokio::time::sleep; +use std::time::Duration; + +pub struct Autoproxy { + current_state: Arc<Mutex<InnerState>>, +} + +#[derive(Clone)] +enum InnerState { + Off, + LatchActive { current_member: Member, last_message: Timestamp }, + LatchInactive, + Member { current_member: Member }, +} + +impl Autoproxy { + pub fn new() -> Self { + Self { + current_state: Arc::new(Mutex::new(InnerState::Off)) + } + } +} + +#[async_trait] +impl SeancePlugin for Autoproxy { + async fn handle_command(&self, system: &System, message: &Message) -> CommandOutcome { + if message.content.starts_with(format!("{}auto ", system.command_prefix).as_str()) { + let args = message.content.replace(format!("{}auto ", system.command_prefix).as_str(), ""); + let mut remainder = args.split_whitespace(); + let first_word = remainder.next(); + + match first_word { + Some("off") => *self.current_state.lock().await = InnerState::Off, + Some("latch") => *self.current_state.lock().await = InnerState::LatchInactive, + Some("member") => { + return CommandOutcome::Errored { message: "Member mode not supported yet".to_string() } + }, + Some(other) => return CommandOutcome::Errored { message: format!("Unknown autoproxy mode: {other}") }, + None => return CommandOutcome::Errored { message: "Must specify autoproxy mode".to_string() }, + } + + CommandOutcome::Handled + } else { + CommandOutcome::Skipped + } + } + + async fn handle_message(&self, system: &System, message: &Message, response: &mut Response) { + if message.content.starts_with("\\") { + if message.content.starts_with("\\\\") { + if let InnerState::LatchActive {current_member: _, last_message: _} = {self.current_state.lock().await.clone()} { + {*self.current_state.lock().await = InnerState::LatchInactive}; + } + } + + *response = Response::Noop { delete_source: message.content == "\\\\" }; + return + } + + if let Response::Noop { delete_source: _ } = response { + let current_state = {self.current_state.lock().await.clone()}; + match current_state { + InnerState::Off => return, + InnerState::LatchInactive => return, + InnerState::Member { current_member } => { + *response = Response::Proxy { + member: current_member.clone(), + content: message.content.clone(), + } + }, + InnerState::LatchActive { current_member, last_message: _ } => { + *response = Response::Proxy { + member: current_member.clone(), + content: message.content.clone(), + } + }, + } + + } + } + + async fn post_response(&self, system: &System, message: &Message, channel: Id<ChannelMarker>, response: &Response) { + match response { + Response::Noop { delete_source } => return, + Response::Proxy { member, content } => { + let current_state = {self.current_state.lock().await.clone()}; + match current_state { + InnerState::Off => return, + InnerState::Member { current_member } => return, + InnerState::LatchInactive => { + {*self.current_state.lock().await = InnerState::LatchActive { + current_member: member.clone(), + last_message: message.timestamp, + }}; + + let state_arc = self.current_state.clone(); + let sent_member = member.clone(); + let sent_timestamp = message.timestamp.clone(); + + tokio::spawn(async move { + sleep(Duration::from_secs(15 * 60)).await; + let current_state = {state_arc.lock().await.clone()}; + + if let InnerState::LatchActive { current_member, last_message } = current_state { + if sent_member.discord_token == current_member.discord_token && sent_timestamp.as_micros() == last_message.as_micros() { + {*state_arc.lock().await = InnerState::LatchInactive}; + } + } + }); + }, + InnerState::LatchActive { current_member: _, last_message: _ } => { + {*self.current_state.lock().await = InnerState::LatchActive { + current_member: member.clone(), + last_message: message.timestamp, + }}; + + let state_arc = self.current_state.clone(); + let sent_member = member.clone(); + let sent_timestamp = message.timestamp.clone(); + + tokio::spawn(async move { + sleep(Duration::from_secs(15 * 60)).await; + let current_state = {state_arc.lock().await.clone()}; + + if let InnerState::LatchActive { current_member, last_message } = current_state { + if sent_member.discord_token == current_member.discord_token && sent_timestamp.as_micros() == last_message.as_micros() { + {*state_arc.lock().await = InnerState::LatchInactive}; + } + } + }); + }, + } + }, + } + + } +} diff --git a/src/system/plugin/prefixes.rs b/src/system/plugin/prefixes.rs new file mode 100644 index 0000000..a513573 --- /dev/null +++ b/src/system/plugin/prefixes.rs @@ -0,0 +1,40 @@ +use async_trait::async_trait; +use twilight_model::id::{marker::ChannelMarker, Id}; +use crate::system::types::Response; + +use super::{CommandOutcome, SeancePlugin}; + +pub struct ProxyPrefixes; + +#[async_trait] +impl SeancePlugin for ProxyPrefixes { + async fn handle_command(&self, _system: &crate::system::types::System, _message: &twilight_model::channel::Message) -> CommandOutcome { + CommandOutcome::Skipped + } + + async fn handle_message(&self, system: &crate::system::types::System, message: &twilight_model::channel::Message, response: &mut crate::system::types::Response) { + if let Response::Noop { delete_source: _ } = response { + for member in &system.members { + println!("Checking member prefix: {:?}", member.message_pattern); + match member.message_pattern.captures(message.content.as_str()) { + None => { + println!("Nope"); + continue; + }, + Some(captures) => match captures.name("content") { + None => continue, + Some(matched_content) => { + println!("Matched member prefix: {:?}", member.message_pattern); + *response = Response::Proxy { member: member.clone(), content: matched_content.as_str().to_string() }; + return + }, + } + } + } + } + } + + async fn post_response(&self, _system: &crate::system::types::System, _message: &twilight_model::channel::Message, _channel: Id<ChannelMarker>, _response: &crate::system::types::Response) { + return + } +} diff --git a/src/system/types.rs b/src/system/types.rs index 2698f9d..bd1e40f 100644 --- a/src/system/types.rs +++ b/src/system/types.rs @@ -1,46 +1,29 @@ -pub use twilight_model::channel::Message as TwiMessage; -use twilight_model::gateway::payload::incoming::MessageUpdate as PartialMessage; -use twilight_model::id::marker::{ChannelMarker, MessageMarker, UserMarker, GuildMarker}; -use twilight_model::id::Id; -use twilight_model::util::Timestamp; - -pub type MemberId = usize; -pub type MessageId = Id<MessageMarker>; -pub type ChannelId = Id<ChannelMarker>; -pub type ServerId = Id<GuildMarker>; -pub type UserId = Id<UserMarker>; -pub type FullMessage = TwiMessage; - -pub type Status = twilight_model::gateway::presence::Status; +use regex::Regex; +use twilight_http::Client; +use twilight_gateway::Shard; +use twilight_model::id::{marker::UserMarker, Id}; +use std::sync::Arc; +use tokio::sync::Mutex; #[derive(Clone)] -pub enum Message { - Complete(FullMessage, MemberId), - Partial(PartialMessage, MemberId), +pub struct Member { + pub discord_token: String, + pub message_pattern: Regex, + pub shard: Arc<Mutex<Shard>>, + pub client: Arc<Mutex<Client>>, } -pub type MessageEvent = (Timestamp, Message); -pub type ReactionEvent = (Timestamp, ()); -pub type CommandEvent = (Timestamp, ()); - -pub enum SystemEvent { - // Process of operation - GatewayConnected(MemberId, UserId), - GatewayError(MemberId, String), - GatewayClosed(MemberId), - RefetchMessage(MemberId, MessageId, ChannelId), - UpdateClientStatus(MemberId), - - // User event handling - NewMessage(Timestamp, FullMessage, MemberId), - EditedMessage(MessageEvent), - NewReaction(ReactionEvent), - - // Command handling - NewCommand(CommandEvent), +#[derive(Clone)] +pub struct System { + pub followed_user: Id<UserMarker>, + pub command_prefix: String, + pub members: Vec<Member> +} - // Autoproxy - AutoproxyTimeout(Timestamp), +#[derive(Clone)] +pub enum Response { + Proxy {member: Member, content: String}, + Noop {delete_source: bool}, } pub enum SystemThreadCommand { diff --git a/src/system/util.rs b/src/system/util.rs new file mode 100644 index 0000000..4b51c09 --- /dev/null +++ b/src/system/util.rs @@ -0,0 +1,113 @@ +use futures::future::join_all; +use twilight_http::Client; +use twilight_model::channel::{message::{AllowedMentions, MentionType, MessageType}, Message}; +use std::sync::Arc; +use tokio::sync::Mutex; +use twilight_model::http::attachment::Attachment; + +pub enum ProxyError { + InvalidMessage(twilight_validate::message::MessageValidationError), + AttachmentRequest(reqwest::Error), + MessageCreate(twilight_http::error::Error), + ResponseDeserialization(twilight_http::response::DeserializeBodyError), +} + +impl From<twilight_validate::message::MessageValidationError> for ProxyError { + fn from(value: twilight_validate::message::MessageValidationError) -> Self { + ProxyError::InvalidMessage(value) + } +} + +impl From<reqwest::Error> for ProxyError{ + fn from(value: reqwest::Error) -> Self { + ProxyError::AttachmentRequest(value) + } +} + +impl From<twilight_http::error::Error> for ProxyError{ + fn from(value: twilight_http::error::Error) -> Self { + ProxyError::MessageCreate(value) + } +} + +impl From<twilight_http::response::DeserializeBodyError> for ProxyError{ + fn from(value: twilight_http::response::DeserializeBodyError) -> Self { + ProxyError::ResponseDeserialization(value) + } +} + +pub async fn duplicate_message(client: &Arc<Mutex<Client>>, message: &Message, content: &str) -> Result<Message, ProxyError> { + let client = client.lock().await; + + let mut create_message = client.create_message(message.channel_id).content(content)?; + + let mut allowed_mentions = AllowedMentions { + parse: Vec::new(), + replied_user: false, + roles: message.mention_roles.clone(), + users: message.mentions.iter().map(|user| user.id).collect(), + }; + + if message.mention_everyone { + allowed_mentions.parse.push(MentionType::Everyone); + } + + if message.kind == MessageType::Reply { + if let Some(ref_message) = message.referenced_message.as_ref() { + create_message = create_message.reply(ref_message.id); + + let pings_referenced_author = message + .mentions + .iter() + .any(|user| user.id == ref_message.author.id); + + if pings_referenced_author { + allowed_mentions.replied_user = true; + } else { + allowed_mentions.replied_user = false; + } + } else { + panic!("Cannot proxy message: Was reply but no referenced message"); + } + } + + let attachments = join_all(message.attachments.iter().map(|attachment| async { + let filename = attachment.filename.clone(); + let description_opt = attachment.description.clone(); + let bytes = reqwest::get(attachment.proxy_url.clone()) + .await? + .bytes() + .await?; + let mut new_attachment = + Attachment::from_bytes(filename, bytes.try_into().unwrap(), attachment.id.into()); + + if let Some(description) = description_opt { + new_attachment.description(description); + } + + Ok(new_attachment) + })) + .await + .iter() + .filter_map( + |result: &Result<Attachment, ProxyError>| match result { + Ok(attachment) => Some(attachment.clone()), + Err(_) => None, + }, + ) + .collect::<Vec<_>>(); + + if attachments.len() > 0 { + create_message = create_message.attachments(attachments.as_slice())?; + } + + if let Some(flags) = message.flags { + create_message = create_message.flags(flags); + } + + create_message = create_message.allowed_mentions(Some(&allowed_mentions)); + let new_message = create_message.await?.model().await?; + + Ok(new_message) +} + |