summary refs log tree commit diff
path: root/src/system/aggregator.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/system/aggregator.rs')
-rw-r--r--src/system/aggregator.rs29
1 files changed, 15 insertions, 14 deletions
diff --git a/src/system/aggregator.rs b/src/system/aggregator.rs
index 00ba8e8..8fbdfdd 100644
--- a/src/system/aggregator.rs
+++ b/src/system/aggregator.rs
@@ -5,12 +5,12 @@ use std::num::NonZeroUsize;
 use tokio::sync::mpsc::{channel, Receiver, Sender};
 use twilight_model::channel::Message as TwiMessage;
 
-use super::{Message as GatewayMessage, MessageEvent, MessageId, SystemEvent};
+use super::{MemberId, Message as GatewayMessage, MessageEvent, MessageId, SystemEvent};
 
 pub struct AggregatorState {
     rx: Receiver<MessageEvent>,
     tx: Sender<MessageEvent>,
-    message_cache: lru::LruCache<MessageId, TwiMessage>,
+    message_cache: lru::LruCache<MessageId, (TwiMessage, MemberId)>,
     system_emitter: Option<Sender<SystemEvent>>,
 }
 
@@ -19,14 +19,14 @@ pub struct MessageAggregator {
 }
 
 impl MessageAggregator {
-    pub fn new() -> Self {
-        let (tx, rx) = channel::<MessageEvent>(100);
+    pub fn new(system_size: usize) -> Self {
+        let (tx, rx) = channel::<MessageEvent>(system_size * 2);
 
         Self {
             state: Arc::new(RwLock::new( AggregatorState {
                 tx,
                 rx,
-                message_cache: LruCache::new(NonZeroUsize::new(100).unwrap()),
+                message_cache: LruCache::new(NonZeroUsize::new(system_size * 2).unwrap()),
                 system_emitter: None,
 
             }))
@@ -41,9 +41,10 @@ impl MessageAggregator {
         self.state.write().await.system_emitter = Some(emitter);
     }
 
-    pub async fn lookup_message(&self, message_id: MessageId) -> Option<TwiMessage> {
-        self.state.write().await.message_cache.get(&message_id).map(|m| m.clone())
-    }
+    // We probably don't actully need this since we've got a separate sent-cache by channel
+    // pub async fn lookup_message(&self, message_id: MessageId) -> Option<TwiMessage> {
+    //     self.state.write().await.message_cache.get(&message_id).map(|m| m.clone())
+    // }
 
     pub fn start(&self) -> () {
         let state = self.state.clone();
@@ -62,7 +63,7 @@ impl MessageAggregator {
                             GatewayMessage::Partial(current_partial, member_id) => {
                                 let cache_content = { state.write().await.message_cache.get(&current_partial.id).map(|m| m.clone()) };
                                 match cache_content {
-                                    Some(original_message) => {
+                                    Some((original_message, member_id)) => {
 
                                         let mut updated_message = original_message.clone();
                                         if let Some(edited_time) = current_partial.edited_timestamp {
@@ -73,7 +74,7 @@ impl MessageAggregator {
                                             updated_message.content = content.clone()
                                         }
 
-                                        self_emitter.send((timestamp, GatewayMessage::Complete(updated_message))).await;
+                                        self_emitter.send((timestamp, GatewayMessage::Complete(updated_message, member_id))).await;
                                     },
                                     None => {
                                         system_emitter.send(
@@ -82,10 +83,10 @@ impl MessageAggregator {
                                     },
                                 };
                             },
-                            GatewayMessage::Complete(message) => {
+                            GatewayMessage::Complete(message, member_id) => {
                                 let previous_message = { state.write().await.message_cache.get(&message.id).map(|m| m.clone()) };
 
-                                if let Some(previous_message) = previous_message {
+                                if let Some((previous_message, _last_seen_by)) = previous_message {
                                     let previous_timestamp = previous_message.edited_timestamp.unwrap_or(previous_message.timestamp);
                                     let current_timestamp = message.edited_timestamp.unwrap_or(message.timestamp);
 
@@ -97,10 +98,10 @@ impl MessageAggregator {
                                     // If not, fall through to update stored message
                                 }
 
-                                { state.write().await.message_cache.put(message.id, message.clone()); };
+                                { state.write().await.message_cache.put(message.id, (message.clone(), member_id)); };
 
                                 system_emitter
-                                    .send(SystemEvent::NewMessage(timestamp, message))
+                                    .send(SystemEvent::NewMessage(timestamp, message, member_id))
                                     .await;
                             },
                         };