use lru::LruCache; use std::num::NonZeroUsize; use tokio::sync::mpsc::{channel, Receiver}; use twilight_model::{util::Timestamp, channel::Message, gateway::payload::incoming::MessageUpdate, id::{marker::MessageMarker, Id}}; use crate::system::types::System; type DiscordToken = String; pub struct MessageAggregator; enum IncomingMessage { Complete {message: Message, timestamp: Timestamp, seen_by: DiscordToken}, Partial {message: MessageUpdate, timestamp: Timestamp, seen_by: DiscordToken}, } impl MessageAggregator { pub fn start(system: &System) -> Receiver<(Message, DiscordToken)> { let incoming_channel = channel::(32); let outgoing_channel = channel::<(Message, DiscordToken)>(32); // Start incoming task for each member for member in system.members.iter() { let followed_user = system.followed_user.clone(); let sender = incoming_channel.0.clone(); let member = member.clone(); tokio::spawn(async move { loop { let next_event = {member.shard.lock().await.next_event().await}; match next_event { Err(err) => println!("Error"), Ok(twilight_gateway::Event::MessageCreate(message)) => { if message.author.id == followed_user { sender.send(IncomingMessage::Complete { message: message.0.clone(), timestamp: message.timestamp.clone(), seen_by: member.discord_token.clone() }).await; } }, Ok(twilight_gateway::Event::MessageUpdate(message)) => { if message.author.is_some() && {message.author.as_ref().unwrap().id == followed_user} { sender.send(IncomingMessage::Partial { message: (*message).clone(), timestamp: message.edited_timestamp.unwrap_or(message.timestamp.expect("No message timestamp")), seen_by: member.discord_token.clone() }).await; } }, _ => {} } } }); } // Start deduping task let mut message_cache = LruCache::, (Message, Timestamp, DiscordToken)>::new(NonZeroUsize::new(32).unwrap()); let mut receiver = incoming_channel.1; let sender = outgoing_channel.0; let system = system.clone(); tokio::spawn(async move { loop { match receiver.recv().await { None => (), Some(IncomingMessage::Complete { message, timestamp, seen_by }) => { if let None = message_cache.get(&message.id) { message_cache.put(message.id.clone(), (message.clone(), timestamp, seen_by.clone())); sender.send((message, seen_by)).await; } }, Some(IncomingMessage::Partial { message, timestamp, seen_by }) => { if let Some((previous_message, previous_timestamp, previously_seen_by)) = message_cache.get(&message.id) { if previous_timestamp.as_micros() < timestamp.as_micros() { let mut updated_message = previous_message.clone(); updated_message.content = message.content.unwrap_or(updated_message.content); message_cache.put(message.id.clone(), (updated_message.clone(), timestamp, seen_by.clone())); sender.send((updated_message, seen_by)).await; } } else { let client = system.members.iter().find(|m| m.discord_token == seen_by).map(|m| m.client.clone()).expect("Could not find client"); if let Ok(updated_message) = client.lock().await.message(message.channel_id, message.id).await.unwrap().model().await.map(|r|r.clone()) { message_cache.put(message.id.clone(), (updated_message.clone(), timestamp, seen_by.clone())); sender.send((updated_message, seen_by)).await; }; } }, }; } }); return outgoing_channel.1 } }