summary refs log tree commit diff
path: root/src/system.rs
diff options
context:
space:
mode:
authorAshelyn Dawn <ashe@ashen.earth>2024-10-01 15:37:09 -0600
committerAshelyn Dawn <ashe@ashen.earth>2024-10-01 15:37:09 -0600
commita6a120ae8b8ed08b0801d76e80a5f7a0b8cde44b (patch)
tree37242703c61137d1198bc373558828b9faa0732b /src/system.rs
parentf87e9727e69e981e4acff31a779e29a35637b964 (diff)
Refactor gateway and message aggregation
Diffstat (limited to 'src/system.rs')
-rw-r--r--src/system.rs374
1 files changed, 0 insertions, 374 deletions
diff --git a/src/system.rs b/src/system.rs
deleted file mode 100644
index f62eeb6..0000000
--- a/src/system.rs
+++ /dev/null
@@ -1,374 +0,0 @@
-use std::{collections::HashMap, convert::Infallible, num::NonZeroUsize, str::FromStr, time::Duration};
-
-use tokio::{sync::mpsc::{channel, Receiver, Sender}, time::{Sleep,sleep}};
-use futures::future::join_all;
-use twilight_gateway::MessageSender;
-use twilight_http::Client;
-use twilight_model::{channel::{message::{AllowedMentions, MentionType, MessageType}, Message}, gateway::{payload::outgoing::{update_presence::UpdatePresencePayload, UpdatePresence}, presence::Status, OpCode}, id::{Id, marker::{ChannelMarker, MessageMarker, UserMarker}}};
-use twilight_model::util::Timestamp;
-use twilight_model::http::attachment::Attachment;
-
-use crate::{config::{AutoproxyConfig, AutoproxyLatchScope, Member, MemberName}, listener::{Listener, ClientEvent}};
-
-pub struct System {
-    pub name: String,
-    pub config: crate::config::System,
-    pub message_dedup_cache: lru::LruCache<Id<MessageMarker>, Timestamp>,
-    pub clients: HashMap<MemberName, Client>,
-    pub latch_state: Option<(Member, Timestamp)>,
-    pub channel: (Sender<ClientEvent>, Receiver<ClientEvent>),
-    pub gateway_channels: HashMap<MemberName, MessageSender>,
-    pub last_presence: HashMap<MemberName, Status>,
-}
-
-impl System {
-    pub fn new(system_name: String, system_config: crate::config::System) -> Self {
-        System {
-            name: system_name,
-            config: system_config,
-            message_dedup_cache: lru::LruCache::new(NonZeroUsize::new(100).unwrap()),
-            clients: HashMap::new(),
-            latch_state: None,
-            channel: channel::<ClientEvent>(100),
-            gateway_channels: HashMap::new(),
-            last_presence: HashMap::new(),
-        }
-    }
-
-    pub async fn start_clients(&mut self) {
-        println!("Starting clients for system {}", self.name);
-
-        let reference_user_id : Id<UserMarker> = Id::from_str(self.config.reference_user_id.as_str())
-            .expect(format!("Invalid user ID: {}", self.config.reference_user_id).as_str());
-
-        for member in self.config.members.iter() {
-            let client = twilight_http::Client::new(member.discord_token.clone());
-            self.clients.insert(member.name.clone(), client);
-
-            let tx = self.channel.0.clone();
-            let member = member.clone();
-
-            // Start gateway listener
-            tokio::spawn(async move {
-                let mut listener = Listener::new(member, reference_user_id);
-                listener.start_listening(tx).await;
-            });
-        }
-
-        loop {
-            match self.channel.1.recv().await {
-                Some(event) => match event {
-                    ClientEvent::Ready { client_name, send_channel } => {
-                        self.gateway_channels.insert(client_name, send_channel);
-
-                        if self.gateway_channels.len() == self.clients.len() {
-                            let tx = self.channel.0.clone();
-                            tokio::spawn(async move {
-                                sleep(Duration::from_secs(10)).await;
-                                let _ = tx.send(ClientEvent::Startup).await;
-                            });
-                        }
-                    },
-                    ClientEvent::Message { event_time, message } => {
-                        if self.is_new_message(message.id, event_time) {
-                            self.handle_message(message, event_time).await;
-                        }
-                    },
-                    ClientEvent::Error(_err) => {
-                        println!("Client ran into an error for system {}", self.name);
-                        return
-                    },
-                    ClientEvent::AutoproxyTimeout { last_message } => {
-                        if let Some((_member, current_last_message)) = self.latch_state.clone() {
-                            if current_last_message == last_message {
-                                println!("Autoproxy timeout has expired: {} (last sent), {} (timeout scheduled)", current_last_message.as_secs(), last_message.as_secs());
-                                self.latch_state = None;
-                                self.update_status_of_system();
-                            }
-                        }
-                    },
-                    ClientEvent::Startup => {
-                        println!("Attempting to set startup status for system {}", self.name.clone());
-                        self.update_status_of_system();
-                    },
-                },
-                None => {
-                    return
-                },
-            }
-        }
-    }
-
-    fn is_new_message(&mut self, message_id: Id<MessageMarker>, timestamp: Timestamp) -> bool {
-        let last_seen_timestamp = self.message_dedup_cache.get(&message_id);
-        let current_timestamp = timestamp;
-
-        if last_seen_timestamp.is_none() || last_seen_timestamp.unwrap().as_micros() < current_timestamp.as_micros() {
-            self.message_dedup_cache.put(message_id, timestamp);
-            true
-        } else {
-            false
-        }
-    }
-
-    async fn handle_message(&mut self, message: Message, timestamp: Timestamp) {
-        // TODO: Commands
-        if message.content.eq("!panic") {
-            panic!("Exiting due to user command");
-        }
-
-        // Escape sequence
-        if message.content.starts_with(r"\") {
-            if message.content == r"\\" {
-                let client = if let Some((current_member, _)) = self.latch_state.clone() {
-                    self.clients.get(&current_member.name).expect(format!("No client for member {}", current_member.name).as_str())
-                } else {
-                    self.clients.iter().next().expect("No clients!").1
-                };
-
-                client.delete_message(message.channel_id, message.id).await.expect("Could not delete message");
-                self.latch_state = None
-            } else if message.content.starts_with(r"\\") {
-                self.latch_state = None;
-            }
-
-            return
-        }
-
-        // TODO: Non-latching prefixes maybe?
-        
-        // Check for prefix
-        let match_prefix = self.config.members.iter().find_map(|member| Some((member, member.matches_proxy_prefix(&message)?)));
-        if let Some((member, matched_content)) = match_prefix {
-            self.proxy_message(&message, member, matched_content).await;
-            self.update_autoproxy_state_after_message(member.clone(), timestamp);
-            self.update_status_of_system();
-            return
-        }
-
-        // Check for autoproxy
-        if let Some(autoproxy_config) = &self.config.autoproxy {
-            match autoproxy_config {
-                AutoproxyConfig::Member {name} => {
-                    let member = self.config.members.iter().find(|member| member.name == *name).expect("Invalid autoproxy member name");
-                    self.proxy_message(&message, member, message.content.as_str()).await;
-                },
-                // TODO: Do something with the latch scope
-                // TODO: Do something with presence setting
-                AutoproxyConfig::Latch { scope, timeout_seconds, presence_indicator } => {
-                    if let Some((member, last_timestamp)) = self.latch_state.clone() {
-                        let time_since_last = timestamp.as_secs() - last_timestamp.as_secs();
-                        if time_since_last <= (*timeout_seconds).into() {
-                            self.proxy_message(&message, &member, message.content.as_str()).await;
-                            self.latch_state = Some((member.clone(), timestamp));
-                            self.update_autoproxy_state_after_message(member.clone(), timestamp);
-                            self.update_status_of_system();
-                        }
-                    }
-                },
-            }
-        }
-    }
-
-    async fn proxy_message(&self, message: &Message, member: &Member, content: &str) {
-        let client = self.clients.get(&member.name).expect("No client for member");
-
-        if let Err(err) = self.duplicate_message(message, client, content).await {
-            match err {
-                MessageDuplicateError::MessageCreate(err) => {
-                    if err.to_string().contains("Cannot send an empty message") {
-                        client.delete_message(message.channel_id, message.id).await.expect("Could not delete message");
-                    }
-                },
-                _ => println!("Error: {:?}", err),
-            }
-        } else {
-            client.delete_message(message.channel_id, message.id).await.expect("Could not delete message");
-        }
-    }
-
-    async fn duplicate_message(&self, message: &Message, client: &Client, content: &str) -> Result<Message, MessageDuplicateError> {
-        let mut create_message = client.create_message(message.channel_id)
-            .content(content)?;
-
-        let mut allowed_mentions = AllowedMentions {
-            parse: Vec::new(),
-            replied_user: false,
-            roles: message.mention_roles.clone(),
-            users: message.mentions.iter().map(|user| user.id).collect(),
-        };
-
-        if message.mention_everyone {
-            allowed_mentions.parse.push(MentionType::Everyone);
-        }
-
-        if message.kind == MessageType::Reply {
-            if let Some(ref_message) = message.referenced_message.as_ref() {
-                create_message = create_message.reply(ref_message.id);
-
-                let pings_referenced_author = message.mentions.iter().any(|user| user.id == ref_message.author.id);
-
-                if pings_referenced_author {
-                    allowed_mentions.replied_user = true;
-                } else {
-                    allowed_mentions.replied_user = false;
-                }
-
-            } else {
-                panic!("Cannot proxy message: Was reply but no referenced message");
-            }
-        }
-
-        let attachments = join_all(message.attachments.iter().map(|attachment| async {
-            let filename = attachment.filename.clone();
-            let description_opt = attachment.description.clone();
-            let bytes = reqwest::get(attachment.proxy_url.clone()).await?.bytes().await?;
-            let mut new_attachment = Attachment::from_bytes(filename, bytes.try_into().unwrap(), attachment.id.into());
-
-            if let Some(description) = description_opt {
-                new_attachment.description(description);
-            }
-
-            Ok(new_attachment)
-        })).await.iter().filter_map(|result: &Result<Attachment, MessageDuplicateError>| match result {
-            Ok(attachment) => Some(attachment.clone()),
-            Err(_) => None,
-        }).collect::<Vec<_>>();
-
-        if attachments.len() > 0 {
-            create_message = create_message.attachments(attachments.as_slice())?;
-        }
-
-        if let Some(flags) = message.flags {
-            create_message = create_message.flags(flags);
-        }
-
-        create_message = create_message.allowed_mentions(Some(&allowed_mentions));
-        let new_message = create_message.await?.model().await?;
-
-        Ok(new_message)
-    }
-
-    fn update_autoproxy_state_after_message(&mut self, member: Member, timestamp: Timestamp) {
-        match &self.config.autoproxy {
-            None => (),
-            Some(AutoproxyConfig::Member { name: _ }) => (),
-            Some(AutoproxyConfig::Latch { scope, timeout_seconds, presence_indicator: _ }) => {
-                self.latch_state = Some((member.clone(), timestamp));
-
-                let tx = self.channel.0.clone();
-                let last_message = timestamp.clone();
-                let timeout_seconds = timeout_seconds.clone();
-
-                tokio::spawn(async move {
-                    sleep(Duration::from_secs(timeout_seconds.into())).await;
-                    tx.send(ClientEvent::AutoproxyTimeout { last_message })
-                        .await.expect("Channel has closed");
-                });
-            }
-        }
-    }
-
-    fn update_status_of_system(&mut self) {
-        let member_states : Vec<(Member, Status)> = self.config.members.iter().map(|member| {
-            match &self.config.autoproxy {
-                None => (member.clone(), Status::Invisible),
-                Some(AutoproxyConfig::Member { name }) => (member.clone(), if member.name == *name {
-                    Status::Online
-                } else {
-                    Status::Invisible
-                }),
-                Some(AutoproxyConfig::Latch { scope, timeout_seconds: _, presence_indicator }) => {
-                    if let AutoproxyLatchScope::Server = scope {
-                        (member.clone(), Status::Invisible)
-                    }
-                    else if !presence_indicator {
-                        (member.clone(), Status::Invisible)
-                    } else {
-                        match &self.latch_state {
-                            Some((latch_member, _last_timestamp)) => (member.clone(), if member.name == latch_member.name {
-                                Status::Online
-                            } else {
-                                Status::Invisible
-                            }),
-                            None => (member.clone(), Status::Invisible),
-                        }
-                    }
-                }
-            }
-        }).collect();
-
-        for (member, status) in member_states {
-            self.update_status_of_member(&member, status);
-        }
-    }
-
-    fn update_status_of_member(&mut self, member: &Member, status: Status) {
-        let last_status = *self.last_presence.get(&member.name).unwrap_or(&Status::Offline);
-
-        if status == last_status {
-            return
-        }
-
-        let gateway_channel = self.gateway_channels.get(&member.name).expect("No gateway shard for member");
-        gateway_channel.command(&UpdatePresence {
-            d: UpdatePresencePayload {
-                activities: Vec::new(),
-                afk: false,
-                since: None,
-                status,
-            },
-            op: OpCode::PresenceUpdate,
-        }).expect("Could not send command to gateway");
-
-        self.last_presence.insert(member.name.clone(), status);
-    }
-}
-
-
-
-impl crate::config::Member {
-    pub fn matches_proxy_prefix<'a>(&self, message: &'a Message) -> Option<&'a str> {
-        match self.message_pattern.captures(message.content.as_str()) {
-            None => None,
-            Some(captures) => {
-                match captures.name("content") {
-                    None => None,
-                    Some(matched_content) => Some(matched_content.as_str()),
-                }
-            }
-        }
-    }
-}
-
-#[derive(Debug)]
-enum MessageDuplicateError {
-    MessageValidation(twilight_validate::message::MessageValidationError),
-    AttachmentRequest(reqwest::Error),
-    MessageCreate(twilight_http::error::Error),
-    ResponseDeserialization(twilight_http::response::DeserializeBodyError)
-}
-
-impl From<twilight_validate::message::MessageValidationError> for MessageDuplicateError {
-    fn from(value: twilight_validate::message::MessageValidationError) -> Self {
-        MessageDuplicateError::MessageValidation(value)
-    }
-}
-
-impl From<reqwest::Error> for MessageDuplicateError {
-    fn from(value: reqwest::Error) -> Self {
-        MessageDuplicateError::AttachmentRequest(value)
-    }
-}
-
-impl From<twilight_http::error::Error> for MessageDuplicateError {
-    fn from(value: twilight_http::error::Error) -> Self {
-        MessageDuplicateError::MessageCreate(value)
-    }
-}
-
-impl From<twilight_http::response::DeserializeBodyError> for MessageDuplicateError {
-    fn from(value: twilight_http::response::DeserializeBodyError) -> Self {
-        MessageDuplicateError::ResponseDeserialization(value)
-    }
-}