summary refs log tree commit diff
path: root/src/system/aggregator.rs
diff options
context:
space:
mode:
authorAshelyn Rose <git@ashen.earth>2025-02-28 21:52:16 -0700
committerAshelyn Rose <git@ashen.earth>2025-02-28 21:52:16 -0700
commite9253bd959bf5bf6e8bcc6de4db247895b015a16 (patch)
tree1817aed80c8255d1292007a81e7f0a16138d25cc /src/system/aggregator.rs
parent5cb49a76c2cedb500b82f405af3cf1dcc0507f98 (diff)
Partial refactor
Handles message prefixes and autoproxy
Diffstat (limited to 'src/system/aggregator.rs')
-rw-r--r--src/system/aggregator.rs177
1 files changed, 80 insertions, 97 deletions
diff --git a/src/system/aggregator.rs b/src/system/aggregator.rs
index 52b1a8c..822c698 100644
--- a/src/system/aggregator.rs
+++ b/src/system/aggregator.rs
@@ -1,114 +1,97 @@
 use lru::LruCache;
-use std::sync::Arc;
-use tokio::sync::RwLock;
 use std::num::NonZeroUsize;
-use tokio::sync::mpsc::{channel, Receiver, Sender};
-use twilight_model::channel::Message as TwiMessage;
+use tokio::sync::mpsc::{channel, Receiver};
+use twilight_model::{util::Timestamp, channel::Message, gateway::payload::incoming::MessageUpdate, id::{marker::MessageMarker, Id}};
+use crate::system::types::System;
 
-use super::{MemberId, Message as GatewayMessage, MessageEvent, MessageId, SystemEvent};
+type DiscordToken = String;
 
-pub struct AggregatorState {
-    rx: Receiver<MessageEvent>,
-    tx: Sender<MessageEvent>,
-    message_cache: lru::LruCache<MessageId, (TwiMessage, MemberId)>,
-    system_emitter: Option<Sender<SystemEvent>>,
-}
+pub struct MessageAggregator;
 
-pub struct MessageAggregator {
-    state: Arc<RwLock<AggregatorState>>,
+enum IncomingMessage {
+    Complete {message: Message, timestamp: Timestamp, seen_by: DiscordToken},
+    Partial {message: MessageUpdate, timestamp: Timestamp, seen_by: DiscordToken},
 }
 
 impl MessageAggregator {
-    pub fn new(system_size: usize) -> Self {
-        let buf_size = std::cmp::max(system_size * 2, 1);
-        let (tx, rx) = channel::<MessageEvent>(buf_size);
-
-        Self {
-            state: Arc::new(RwLock::new( AggregatorState {
-                tx,
-                rx,
-                message_cache: LruCache::new(NonZeroUsize::new(buf_size).unwrap()),
-                system_emitter: None,
-
-            }))
+    pub fn start(system: &System) -> Receiver<(Message, DiscordToken)> {
+        let incoming_channel = channel::<IncomingMessage>(32);
+        let outgoing_channel = channel::<(Message, DiscordToken)>(32);
+
+        // Start incoming task for each member
+        for member in system.members.iter() {
+            let followed_user = system.followed_user.clone();
+            let sender = incoming_channel.0.clone();
+            let member = member.clone();
+
+            tokio::spawn(async move {
+                loop {
+                    let next_event = {member.shard.lock().await.next_event().await};
+
+                    match next_event {
+                        Err(err) => println!("Error"),
+                        Ok(twilight_gateway::Event::MessageCreate(message)) => {
+                            if message.author.id == followed_user {
+                                sender.send(IncomingMessage::Complete {
+                                    message: message.0.clone(),
+                                    timestamp: message.timestamp.clone(),
+                                    seen_by: member.discord_token.clone()
+                                }).await;
+                            }
+                        },
+                        Ok(twilight_gateway::Event::MessageUpdate(message)) => {
+                            if message.author.is_some() && {message.author.as_ref().unwrap().id == followed_user} {
+                                sender.send(IncomingMessage::Partial {
+                                    message: (*message).clone(),
+                                    timestamp: message.edited_timestamp.unwrap_or(message.timestamp.expect("No message timestamp")),
+                                    seen_by: member.discord_token.clone()
+                                }).await;
+                            }
+                        },
+                        _ => {}
+                    }
+                }
+            });
         }
-    }
-
-    pub async fn get_sender(&self) -> Sender<MessageEvent> {
-        self.state.read().await.tx.clone()
-    }
-
-    pub async fn set_system_handler(&mut self, emitter: Sender<SystemEvent>) -> () {
-        self.state.write().await.system_emitter = Some(emitter);
-    }
-
-    // We probably don't actully need this since we've got a separate sent-cache by channel
-    // pub async fn lookup_message(&self, message_id: MessageId) -> Option<TwiMessage> {
-    //     self.state.write().await.message_cache.get(&message_id).map(|m| m.clone())
-    // }
-
-    pub fn start(&self) -> () {
-        let state = self.state.clone();
 
+        // Start deduping task
+        let mut message_cache = LruCache::<Id<MessageMarker>, (Message, Timestamp, DiscordToken)>::new(NonZeroUsize::new(32).unwrap());
+        let mut receiver = incoming_channel.1;
+        let sender = outgoing_channel.0;
+        let system = system.clone();
         tokio::spawn(async move {
             loop {
-                let system_emitter = { state.read().await.system_emitter.clone().expect("No system emitter") };
-                let self_emitter = { state.read().await.tx.clone() };
-                let next_event = { state.write().await.rx.recv().await };
-
-
-                match next_event {
+                match receiver.recv().await {
                     None => (),
-                    Some((timestamp, message)) => {
-                        match message {
-                            GatewayMessage::Partial(current_partial, member_id) => {
-                                let cache_content = { state.write().await.message_cache.get(&current_partial.id).map(|m| m.clone()) };
-                                match cache_content {
-                                    Some((original_message, member_id)) => {
-
-                                        let mut updated_message = original_message.clone();
-                                        if let Some(edited_time) = current_partial.edited_timestamp {
-                                            updated_message.edited_timestamp = Some(edited_time);
-                                        }
-
-                                        if let Some(content) = &current_partial.content {
-                                            updated_message.content = content.clone()
-                                        }
-
-                                        self_emitter.send((timestamp, GatewayMessage::Complete(updated_message, member_id))).await;
-                                    },
-                                    None => {
-                                        system_emitter.send(
-                                            SystemEvent::RefetchMessage(member_id, current_partial.id, current_partial.channel_id)
-                                        ).await;
-                                    },
-                                };
-                            },
-                            GatewayMessage::Complete(message, member_id) => {
-                                let previous_message = { state.write().await.message_cache.get(&message.id).map(|m| m.clone()) };
-
-                                if let Some((previous_message, _last_seen_by)) = previous_message {
-                                    let previous_timestamp = previous_message.edited_timestamp.unwrap_or(previous_message.timestamp);
-                                    let current_timestamp = message.edited_timestamp.unwrap_or(message.timestamp);
-
-                                    // Should we skip sending
-                                    if previous_timestamp.as_micros() >= current_timestamp.as_micros() {
-                                        continue
-                                    }
-                                    
-                                    // If not, fall through to update stored message
-                                }
-
-                                { state.write().await.message_cache.put(message.id, (message.clone(), member_id)); };
-
-                                system_emitter
-                                    .send(SystemEvent::NewMessage(timestamp, message, member_id))
-                                    .await;
-                            },
-                        };
-                    }
-                }
+                    Some(IncomingMessage::Complete { message, timestamp, seen_by }) => {
+                        if let None = message_cache.get(&message.id) {
+                            message_cache.put(message.id.clone(), (message.clone(), timestamp, seen_by.clone()));
+                            sender.send((message, seen_by)).await;
+                        }
+                        
+                    },
+                    Some(IncomingMessage::Partial { message, timestamp, seen_by }) => {
+                        if let Some((previous_message, previous_timestamp, previously_seen_by)) = message_cache.get(&message.id) {
+                            if previous_timestamp.as_micros() < timestamp.as_micros() {
+                                let mut updated_message = previous_message.clone();
+                                updated_message.content = message.content.unwrap_or(updated_message.content);
+                                message_cache.put(message.id.clone(), (updated_message.clone(), timestamp, seen_by.clone()));
+                                sender.send((updated_message, seen_by)).await;
+                            }
+                        } else {
+                            let client = system.members.iter().find(|m| m.discord_token == seen_by).map(|m| m.client.clone()).expect("Could not find client");
+                            if let Ok(updated_message) = client.lock().await.message(message.channel_id, message.id).await.unwrap().model().await.map(|r|r.clone()) {
+                                message_cache.put(message.id.clone(), (updated_message.clone(), timestamp, seen_by.clone()));
+                                sender.send((updated_message, seen_by)).await;
+                            };
+                        }
+
+                    },
+                };
             }
         });
+
+        return outgoing_channel.1
+
     }
 }