diff options
author | Ashelyn Rose <git@ashen.earth> | 2025-02-28 21:52:16 -0700 |
---|---|---|
committer | Ashelyn Rose <git@ashen.earth> | 2025-02-28 21:52:16 -0700 |
commit | e9253bd959bf5bf6e8bcc6de4db247895b015a16 (patch) | |
tree | 1817aed80c8255d1292007a81e7f0a16138d25cc /src/system/aggregator.rs | |
parent | 5cb49a76c2cedb500b82f405af3cf1dcc0507f98 (diff) |
Partial refactor
Handles message prefixes and autoproxy
Diffstat (limited to 'src/system/aggregator.rs')
-rw-r--r-- | src/system/aggregator.rs | 177 |
1 files changed, 80 insertions, 97 deletions
diff --git a/src/system/aggregator.rs b/src/system/aggregator.rs index 52b1a8c..822c698 100644 --- a/src/system/aggregator.rs +++ b/src/system/aggregator.rs @@ -1,114 +1,97 @@ use lru::LruCache; -use std::sync::Arc; -use tokio::sync::RwLock; use std::num::NonZeroUsize; -use tokio::sync::mpsc::{channel, Receiver, Sender}; -use twilight_model::channel::Message as TwiMessage; +use tokio::sync::mpsc::{channel, Receiver}; +use twilight_model::{util::Timestamp, channel::Message, gateway::payload::incoming::MessageUpdate, id::{marker::MessageMarker, Id}}; +use crate::system::types::System; -use super::{MemberId, Message as GatewayMessage, MessageEvent, MessageId, SystemEvent}; +type DiscordToken = String; -pub struct AggregatorState { - rx: Receiver<MessageEvent>, - tx: Sender<MessageEvent>, - message_cache: lru::LruCache<MessageId, (TwiMessage, MemberId)>, - system_emitter: Option<Sender<SystemEvent>>, -} +pub struct MessageAggregator; -pub struct MessageAggregator { - state: Arc<RwLock<AggregatorState>>, +enum IncomingMessage { + Complete {message: Message, timestamp: Timestamp, seen_by: DiscordToken}, + Partial {message: MessageUpdate, timestamp: Timestamp, seen_by: DiscordToken}, } impl MessageAggregator { - pub fn new(system_size: usize) -> Self { - let buf_size = std::cmp::max(system_size * 2, 1); - let (tx, rx) = channel::<MessageEvent>(buf_size); - - Self { - state: Arc::new(RwLock::new( AggregatorState { - tx, - rx, - message_cache: LruCache::new(NonZeroUsize::new(buf_size).unwrap()), - system_emitter: None, - - })) + pub fn start(system: &System) -> Receiver<(Message, DiscordToken)> { + let incoming_channel = channel::<IncomingMessage>(32); + let outgoing_channel = channel::<(Message, DiscordToken)>(32); + + // Start incoming task for each member + for member in system.members.iter() { + let followed_user = system.followed_user.clone(); + let sender = incoming_channel.0.clone(); + let member = member.clone(); + + tokio::spawn(async move { + loop { + let next_event = {member.shard.lock().await.next_event().await}; + + match next_event { + Err(err) => println!("Error"), + Ok(twilight_gateway::Event::MessageCreate(message)) => { + if message.author.id == followed_user { + sender.send(IncomingMessage::Complete { + message: message.0.clone(), + timestamp: message.timestamp.clone(), + seen_by: member.discord_token.clone() + }).await; + } + }, + Ok(twilight_gateway::Event::MessageUpdate(message)) => { + if message.author.is_some() && {message.author.as_ref().unwrap().id == followed_user} { + sender.send(IncomingMessage::Partial { + message: (*message).clone(), + timestamp: message.edited_timestamp.unwrap_or(message.timestamp.expect("No message timestamp")), + seen_by: member.discord_token.clone() + }).await; + } + }, + _ => {} + } + } + }); } - } - - pub async fn get_sender(&self) -> Sender<MessageEvent> { - self.state.read().await.tx.clone() - } - - pub async fn set_system_handler(&mut self, emitter: Sender<SystemEvent>) -> () { - self.state.write().await.system_emitter = Some(emitter); - } - - // We probably don't actully need this since we've got a separate sent-cache by channel - // pub async fn lookup_message(&self, message_id: MessageId) -> Option<TwiMessage> { - // self.state.write().await.message_cache.get(&message_id).map(|m| m.clone()) - // } - - pub fn start(&self) -> () { - let state = self.state.clone(); + // Start deduping task + let mut message_cache = LruCache::<Id<MessageMarker>, (Message, Timestamp, DiscordToken)>::new(NonZeroUsize::new(32).unwrap()); + let mut receiver = incoming_channel.1; + let sender = outgoing_channel.0; + let system = system.clone(); tokio::spawn(async move { loop { - let system_emitter = { state.read().await.system_emitter.clone().expect("No system emitter") }; - let self_emitter = { state.read().await.tx.clone() }; - let next_event = { state.write().await.rx.recv().await }; - - - match next_event { + match receiver.recv().await { None => (), - Some((timestamp, message)) => { - match message { - GatewayMessage::Partial(current_partial, member_id) => { - let cache_content = { state.write().await.message_cache.get(¤t_partial.id).map(|m| m.clone()) }; - match cache_content { - Some((original_message, member_id)) => { - - let mut updated_message = original_message.clone(); - if let Some(edited_time) = current_partial.edited_timestamp { - updated_message.edited_timestamp = Some(edited_time); - } - - if let Some(content) = ¤t_partial.content { - updated_message.content = content.clone() - } - - self_emitter.send((timestamp, GatewayMessage::Complete(updated_message, member_id))).await; - }, - None => { - system_emitter.send( - SystemEvent::RefetchMessage(member_id, current_partial.id, current_partial.channel_id) - ).await; - }, - }; - }, - GatewayMessage::Complete(message, member_id) => { - let previous_message = { state.write().await.message_cache.get(&message.id).map(|m| m.clone()) }; - - if let Some((previous_message, _last_seen_by)) = previous_message { - let previous_timestamp = previous_message.edited_timestamp.unwrap_or(previous_message.timestamp); - let current_timestamp = message.edited_timestamp.unwrap_or(message.timestamp); - - // Should we skip sending - if previous_timestamp.as_micros() >= current_timestamp.as_micros() { - continue - } - - // If not, fall through to update stored message - } - - { state.write().await.message_cache.put(message.id, (message.clone(), member_id)); }; - - system_emitter - .send(SystemEvent::NewMessage(timestamp, message, member_id)) - .await; - }, - }; - } - } + Some(IncomingMessage::Complete { message, timestamp, seen_by }) => { + if let None = message_cache.get(&message.id) { + message_cache.put(message.id.clone(), (message.clone(), timestamp, seen_by.clone())); + sender.send((message, seen_by)).await; + } + + }, + Some(IncomingMessage::Partial { message, timestamp, seen_by }) => { + if let Some((previous_message, previous_timestamp, previously_seen_by)) = message_cache.get(&message.id) { + if previous_timestamp.as_micros() < timestamp.as_micros() { + let mut updated_message = previous_message.clone(); + updated_message.content = message.content.unwrap_or(updated_message.content); + message_cache.put(message.id.clone(), (updated_message.clone(), timestamp, seen_by.clone())); + sender.send((updated_message, seen_by)).await; + } + } else { + let client = system.members.iter().find(|m| m.discord_token == seen_by).map(|m| m.client.clone()).expect("Could not find client"); + if let Ok(updated_message) = client.lock().await.message(message.channel_id, message.id).await.unwrap().model().await.map(|r|r.clone()) { + message_cache.put(message.id.clone(), (updated_message.clone(), timestamp, seen_by.clone())); + sender.send((updated_message, seen_by)).await; + }; + } + + }, + }; } }); + + return outgoing_channel.1 + } } |