From a6a120ae8b8ed08b0801d76e80a5f7a0b8cde44b Mon Sep 17 00:00:00 2001 From: Ashelyn Dawn Date: Tue, 1 Oct 2024 15:37:09 -0600 Subject: Refactor gateway and message aggregation --- src/system/aggregator.rs | 61 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 src/system/aggregator.rs (limited to 'src/system/aggregator.rs') diff --git a/src/system/aggregator.rs b/src/system/aggregator.rs new file mode 100644 index 0000000..0177249 --- /dev/null +++ b/src/system/aggregator.rs @@ -0,0 +1,61 @@ +use lru::LruCache; +use std::num::NonZeroUsize; +use tokio::sync::mpsc::{channel, Receiver, Sender}; + +use super::{MessageEvent, MessageId, SystemEvent}; + +pub struct MessageAggregator { + rx: Receiver, + tx: Sender, + message_cache: lru::LruCache, + emitter: Option>, +} + +impl MessageAggregator { + pub fn new() -> Self { + let (tx, rx) = channel::(100); + + Self { + tx, + rx, + message_cache: LruCache::new(NonZeroUsize::new(100).unwrap()), + emitter: None, + } + } + + pub fn get_sender(&self) -> Sender { + self.tx.clone() + } + + pub fn set_handler(&mut self, emitter: Sender) -> () { + self.emitter = Some(emitter); + } + + pub fn start(mut self) -> () { + tokio::spawn(async move { + loop { + match self.rx.recv().await { + None => return, + Some((timestamp, message)) => { + let last_seen_timestamp = self.message_cache.get(&message.id); + let current_timestamp = timestamp; + + if last_seen_timestamp.is_none() + || last_seen_timestamp.unwrap().0.as_micros() + < current_timestamp.as_micros() + { + self.message_cache + .put(message.id, (timestamp, message.clone())); + + if let Some(emitter) = &self.emitter { + emitter + .send(SystemEvent::NewMessage((timestamp, message))) + .await; + } + }; + } + } + } + }); + } +} -- cgit 1.4.1