summary refs log tree commit diff
path: root/src/system/aggregator.rs
blob: f73dec54d322013e10c3cc38b63b338d015fcdaf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use lru::LruCache;
use std::num::NonZeroUsize;
use tokio::sync::mpsc::{channel, Receiver};
use twilight_model::{channel::Message, gateway::payload::incoming::MessageUpdate, id::{marker::{MessageMarker, UserMarker}, Id}, util::Timestamp};
use crate::system::types::System;

type DiscordToken = String;

pub struct MessageAggregator;

enum IncomingMessage {
    Complete {message: Message, timestamp: Timestamp, seen_by: DiscordToken},
    Partial {message: MessageUpdate, timestamp: Timestamp, seen_by: DiscordToken},
}

pub enum MemberEvent {
    Message(Message, DiscordToken),
    GatewayConnect(DiscordToken, Id<UserMarker>),
    GatewayDisconnect(DiscordToken),
    GatewayError(DiscordToken),
}

impl MessageAggregator {
    pub fn start(system: &System) -> Receiver<MemberEvent> {
        let incoming_channel = channel::<IncomingMessage>(32);
        let outgoing_channel = channel::<MemberEvent>(32);

        // Start incoming task for each member
        for member in system.members.iter() {
            let followed_user = system.followed_user.clone();
            let sender = incoming_channel.0.clone();
            let member = member.clone();
            let outgoing_channel = outgoing_channel.0.clone();

            tokio::spawn(async move {
                loop {
                    let next_event = {member.shard.lock().await.next_event().await};

                    match next_event {
                        Err(source) => {
                            if source.is_fatal() {
                                let _ = outgoing_channel.send(MemberEvent::GatewayDisconnect(member.discord_token.clone())).await;
                                return
                            } else {
                                let _ = outgoing_channel.send(MemberEvent::GatewayError(member.discord_token.clone())).await;
                            }
                        },
                        Ok(twilight_gateway::Event::Ready(ready)) => {
                            let _ = outgoing_channel.send(MemberEvent::GatewayConnect(member.discord_token.clone(), ready.user.id)).await;
                        },
                        Ok(twilight_gateway::Event::MessageCreate(message)) => {
                            if message.author.id == followed_user {
                                let _ = sender.send(IncomingMessage::Complete {
                                    message: message.0.clone(),
                                    timestamp: message.timestamp.clone(),
                                    seen_by: member.discord_token.clone()
                                }).await;
                            }
                        },
                        Ok(twilight_gateway::Event::MessageUpdate(message)) => {
                            if message.author.is_some() && {message.author.as_ref().unwrap().id == followed_user} {
                                let _ = sender.send(IncomingMessage::Partial {
                                    message: (*message).clone(),
                                    timestamp: message.edited_timestamp.unwrap_or(message.timestamp.expect("No message timestamp")),
                                    seen_by: member.discord_token.clone()
                                }).await;
                            }
                        },
                        _ => {}
                    }
                }
            });
        }

        // Start deduping task
        let mut message_cache = LruCache::<Id<MessageMarker>, (Message, Timestamp, DiscordToken)>::new(NonZeroUsize::new(32).unwrap());
        let mut receiver = incoming_channel.1;
        let sender = outgoing_channel.0;
        let system = system.clone();
        tokio::spawn(async move {
            loop {
                match receiver.recv().await {
                    None => (),
                    Some(IncomingMessage::Complete { message, timestamp, seen_by }) => {
                        if let None = message_cache.get(&message.id) {
                            message_cache.put(message.id.clone(), (message.clone(), timestamp, seen_by.clone()));
                            let _ = sender.send(MemberEvent::Message(message, seen_by)).await;
                        }
                        
                    },
                    Some(IncomingMessage::Partial { message, timestamp, seen_by }) => {
                        if let Some((previous_message, previous_timestamp, _)) = message_cache.get(&message.id) {
                            if previous_timestamp.as_micros() < timestamp.as_micros() {
                                let mut updated_message = previous_message.clone();
                                updated_message.content = message.content.unwrap_or(updated_message.content);
                                message_cache.put(message.id.clone(), (updated_message.clone(), timestamp, seen_by.clone()));
                                let _ = sender.send(MemberEvent::Message(updated_message, seen_by)).await;
                            }
                        } else {
                            let client = system.members.iter().find(|m| m.discord_token == seen_by).map(|m| m.client.clone()).expect("Could not find client");
                            if let Ok(updated_message) = client.lock().await.message(message.channel_id, message.id).await.unwrap().model().await.map(|r|r.clone()) {
                                message_cache.put(message.id.clone(), (updated_message.clone(), timestamp, seen_by.clone()));
                                let _ = sender.send(MemberEvent::Message(updated_message, seen_by)).await;
                            };
                        }

                    },
                };
            }
        });

        return outgoing_channel.1

    }
}