summary refs log tree commit diff
path: root/src/system/aggregator.rs
diff options
context:
space:
mode:
authorAshelyn Rose <git@ashen.earth>2024-10-03 03:18:26 -0600
committerAshelyn Rose <git@ashen.earth>2024-10-03 03:18:26 -0600
commit0053ccbb31c3b87285bf38ee3eda3308c67ad707 (patch)
tree63c103173127ba691922fe4258e0ebdc6f266a6e /src/system/aggregator.rs
parent8b716d49ed019213d91a45f094684f26fac289bd (diff)
Refactor bot into separate client + gateway
Diffstat (limited to 'src/system/aggregator.rs')
-rw-r--r--src/system/aggregator.rs73
1 files changed, 52 insertions, 21 deletions
diff --git a/src/system/aggregator.rs b/src/system/aggregator.rs
index 0177249..6873272 100644
--- a/src/system/aggregator.rs
+++ b/src/system/aggregator.rs
@@ -1,14 +1,15 @@
 use lru::LruCache;
 use std::num::NonZeroUsize;
 use tokio::sync::mpsc::{channel, Receiver, Sender};
+use twilight_model::channel::Message as TwiMessage;
 
-use super::{MessageEvent, MessageId, SystemEvent};
+use super::{Message as GatewayMessage, MessageEvent, MessageId, SystemEvent};
 
 pub struct MessageAggregator {
     rx: Receiver<MessageEvent>,
     tx: Sender<MessageEvent>,
-    message_cache: lru::LruCache<MessageId, MessageEvent>,
-    emitter: Option<Sender<SystemEvent>>,
+    message_cache: lru::LruCache<MessageId, TwiMessage>,
+    system_emitter: Option<Sender<SystemEvent>>,
 }
 
 impl MessageAggregator {
@@ -19,7 +20,7 @@ impl MessageAggregator {
             tx,
             rx,
             message_cache: LruCache::new(NonZeroUsize::new(100).unwrap()),
-            emitter: None,
+            system_emitter: None,
         }
     }
 
@@ -27,31 +28,61 @@ impl MessageAggregator {
         self.tx.clone()
     }
 
-    pub fn set_handler(&mut self, emitter: Sender<SystemEvent>) -> () {
-        self.emitter = Some(emitter);
+    pub fn set_system_handler(&mut self, emitter: Sender<SystemEvent>) -> () {
+        self.system_emitter = Some(emitter);
     }
 
     pub fn start(mut self) -> () {
         tokio::spawn(async move {
             loop {
                 match self.rx.recv().await {
-                    None => return,
+                    None => (),
                     Some((timestamp, message)) => {
-                        let last_seen_timestamp = self.message_cache.get(&message.id);
-                        let current_timestamp = timestamp;
-
-                        if last_seen_timestamp.is_none()
-                            || last_seen_timestamp.unwrap().0.as_micros()
-                                < current_timestamp.as_micros()
-                        {
-                            self.message_cache
-                                .put(message.id, (timestamp, message.clone()));
-
-                            if let Some(emitter) = &self.emitter {
-                                emitter
-                                    .send(SystemEvent::NewMessage((timestamp, message)))
+                        let system_emitter = &self.system_emitter.clone().expect("No system emitter");
+                        match message {
+                            GatewayMessage::Partial(current_partial, member_id) => {
+                                match self.message_cache.get(&current_partial.id) {
+                                    Some(original_message) => {
+
+                                        let mut updated_message = original_message.clone();
+                                        if let Some(edited_time) = current_partial.edited_timestamp {
+                                            updated_message.edited_timestamp = Some(edited_time);
+                                        }
+
+                                        if let Some(content) = current_partial.content {
+                                            updated_message.content = content
+                                        }
+
+                                        self.tx.send((timestamp, GatewayMessage::Complete(updated_message))).await;
+                                    },
+                                    None => {
+                                        system_emitter.send(
+                                            SystemEvent::RefetchMessage(member_id, current_partial.id, current_partial.channel_id)
+                                        ).await;
+                                    },
+                                };
+                            },
+                            GatewayMessage::Complete(message) => {
+                                let previous_message = self.message_cache.get(&message.id);
+
+                                if let Some(previous_message) = previous_message.cloned() {
+                                    let previous_timestamp = previous_message.edited_timestamp.unwrap_or(previous_message.timestamp);
+                                    let current_timestamp = message.edited_timestamp.unwrap_or(message.timestamp);
+
+                                    // Should we skip sending
+                                    if previous_timestamp.as_micros() >= current_timestamp.as_micros() {
+                                        continue
+                                    }
+                                    
+                                    // If not, fall through to update stored message
+                                }
+
+                                self.message_cache.put(message.id, message.clone());
+
+                                self.system_emitter.as_ref().expect("Aggregator has no system emitter")
+                                    .send(SystemEvent::NewMessage(timestamp, message))
                                     .await;
-                            }
+                            },
                         };
                     }
                 }