summary refs log tree commit diff
path: root/src/system/aggregator.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/system/aggregator.rs')
-rw-r--r--src/system/aggregator.rs61
1 files changed, 61 insertions, 0 deletions
diff --git a/src/system/aggregator.rs b/src/system/aggregator.rs
new file mode 100644
index 0000000..0177249
--- /dev/null
+++ b/src/system/aggregator.rs
@@ -0,0 +1,61 @@
+use lru::LruCache;
+use std::num::NonZeroUsize;
+use tokio::sync::mpsc::{channel, Receiver, Sender};
+
+use super::{MessageEvent, MessageId, SystemEvent};
+
+pub struct MessageAggregator {
+    rx: Receiver<MessageEvent>,
+    tx: Sender<MessageEvent>,
+    message_cache: lru::LruCache<MessageId, MessageEvent>,
+    emitter: Option<Sender<SystemEvent>>,
+}
+
+impl MessageAggregator {
+    pub fn new() -> Self {
+        let (tx, rx) = channel::<MessageEvent>(100);
+
+        Self {
+            tx,
+            rx,
+            message_cache: LruCache::new(NonZeroUsize::new(100).unwrap()),
+            emitter: None,
+        }
+    }
+
+    pub fn get_sender(&self) -> Sender<MessageEvent> {
+        self.tx.clone()
+    }
+
+    pub fn set_handler(&mut self, emitter: Sender<SystemEvent>) -> () {
+        self.emitter = Some(emitter);
+    }
+
+    pub fn start(mut self) -> () {
+        tokio::spawn(async move {
+            loop {
+                match self.rx.recv().await {
+                    None => return,
+                    Some((timestamp, message)) => {
+                        let last_seen_timestamp = self.message_cache.get(&message.id);
+                        let current_timestamp = timestamp;
+
+                        if last_seen_timestamp.is_none()
+                            || last_seen_timestamp.unwrap().0.as_micros()
+                                < current_timestamp.as_micros()
+                        {
+                            self.message_cache
+                                .put(message.id, (timestamp, message.clone()));
+
+                            if let Some(emitter) = &self.emitter {
+                                emitter
+                                    .send(SystemEvent::NewMessage((timestamp, message)))
+                                    .await;
+                            }
+                        };
+                    }
+                }
+            }
+        });
+    }
+}