diff options
Diffstat (limited to 'src/system.rs')
-rw-r--r-- | src/system.rs | 110 |
1 files changed, 103 insertions, 7 deletions
diff --git a/src/system.rs b/src/system.rs index e3882ad..69d8b0e 100644 --- a/src/system.rs +++ b/src/system.rs @@ -1,9 +1,10 @@ -use std::{collections::HashMap, num::NonZeroUsize, str::FromStr}; +use std::{collections::HashMap, num::NonZeroUsize, str::FromStr, time::Duration}; -use tokio::sync::mpsc::channel; +use tokio::{sync::mpsc::{channel, Receiver, Sender}, time::{Sleep,sleep}}; use futures::future::join_all; +use twilight_gateway::MessageSender; use twilight_http::Client; -use twilight_model::{channel::{message::MessageType, Message}, id::{Id, marker::{ChannelMarker, MessageMarker, UserMarker}}}; +use twilight_model::{channel::{message::MessageType, Message}, gateway::{payload::outgoing::{update_presence::UpdatePresencePayload, UpdatePresence}, presence::Status, OpCode}, id::{Id, marker::{ChannelMarker, MessageMarker, UserMarker}}}; use twilight_model::util::Timestamp; use twilight_model::http::attachment::Attachment; @@ -15,6 +16,9 @@ pub struct System { pub message_dedup_cache: lru::LruCache<Id<MessageMarker>, Timestamp>, pub clients: HashMap<MemberName, Client>, pub latch_state: Option<(Member, Timestamp)>, + pub channel: (Sender<ClientEvent>, Receiver<ClientEvent>), + pub autoproxy_timeout: Option<Timestamp>, + pub gateway_channels: HashMap<MemberName, MessageSender>, } impl System { @@ -25,14 +29,15 @@ impl System { message_dedup_cache: lru::LruCache::new(NonZeroUsize::new(100).unwrap()), clients: HashMap::new(), latch_state: None, + autoproxy_timeout: None, + channel: channel::<ClientEvent>(100), + gateway_channels: HashMap::new(), } } pub async fn start_clients(&mut self) { println!("Starting clients for system {}", self.name); - let (tx, mut rx) = channel::<ClientEvent>(100); - let reference_user_id : Id<UserMarker> = Id::from_str(self.config.reference_user_id.as_str()) .expect(format!("Invalid user ID: {}", self.config.reference_user_id).as_str()); @@ -40,7 +45,7 @@ impl System { let client = twilight_http::Client::new(member.discord_token.clone()); self.clients.insert(member.name.clone(), client); - let tx = tx.clone(); + let tx = self.channel.0.clone(); let member = member.clone(); tokio::spawn(async move { let mut listener = Listener::new(member, reference_user_id); @@ -49,8 +54,11 @@ impl System { } loop { - match rx.recv().await { + match self.channel.1.recv().await { Some(event) => match event { + ClientEvent::Ready { client_name, send_channel } => { + self.gateway_channels.insert(client_name, send_channel); + }, ClientEvent::Message { event_time, message } => { if self.is_new_message(message.id, event_time) { self.handle_message(message, event_time).await; @@ -60,6 +68,14 @@ impl System { println!("Client ran into an error for system {}", self.name); return }, + ClientEvent::AutoproxyTimeout { last_message } => { + if let Some(current_last_message) = self.autoproxy_timeout { + if current_last_message == last_message { + self.latch_state = None; + self.update_presence().await; + } + } + }, }, None => { return @@ -108,6 +124,7 @@ impl System { if let Some((member, matched_content)) = match_prefix { self.proxy_message(&message, member, matched_content).await; self.update_autoproxy_state_after_message(member.clone(), timestamp); + self.update_presence().await; return } @@ -126,6 +143,7 @@ impl System { if time_since_last <= (*timeout_seconds).into() { self.proxy_message(&message, &member, message.content.as_str()).await; self.latch_state = Some((member.clone(), timestamp)); + self.update_presence().await; } } }, @@ -178,6 +196,84 @@ impl System { Some(AutoproxyConfig::Member { name }) => (), Some(AutoproxyConfig::Latch { scope, timeout_seconds, presence_indicator }) => { self.latch_state = Some((member.clone(), timestamp)); + self.autoproxy_timeout = Some(timestamp); + + let tx = self.channel.0.clone(); + let last_message = timestamp.clone(); + let timeout_seconds = timeout_seconds.clone(); + + tokio::spawn(async move { + sleep(Duration::from_secs(timeout_seconds.into())).await; + tx.send(ClientEvent::AutoproxyTimeout { last_message }) + .await.expect("Channel has closed"); + }); + } + } + } + + async fn update_presence(&mut self) { + match &self.config.autoproxy { + None => (), + Some(AutoproxyConfig::Member { name }) => { + for member in &self.config.members { + let gateway_channel = self.gateway_channels.get(&member.name).expect("No gateway shard for member"); + + println!("Updating {} to {}", member.name, if member.name == *name { "online" } else { "offline" }); + + gateway_channel.command(&UpdatePresence { + d: UpdatePresencePayload { + activities: Vec::new(), + afk: false, + since: None, + status: if member.name == *name { + Status::Online + } else { + Status::Invisible + }, + }, + op: OpCode::PresenceUpdate, + }).expect("Could not send command to gateway"); + } + }, + Some(AutoproxyConfig::Latch { scope, timeout_seconds, presence_indicator }) => { + if let Some((member, last_timestamp)) = &self.latch_state { + let name = &member.name; + for member in &self.config.members { + let gateway_channel = self.gateway_channels.get(&member.name).expect("No gateway shard for member"); + + println!("Updating {} to {}", member.name, if member.name == *name { "online" } else { "offline" }); + + gateway_channel.command(&UpdatePresence { + d: UpdatePresencePayload { + activities: Vec::new(), + afk: false, + since: None, + status: if member.name == *name { + Status::Online + } else { + Status::Invisible + }, + }, + op: OpCode::PresenceUpdate, + }).expect("Could not send command to gateway"); + } + } else { + for member in &self.config.members { + let gateway_channel = self.gateway_channels.get(&member.name).expect("No gateway shard for member"); + + println!("Updating {} to offline", member.name); + + gateway_channel.command(&UpdatePresence { + d: UpdatePresencePayload { + activities: Vec::new(), + afk: false, + since: None, + status: Status::Invisible, + }, + op: OpCode::PresenceUpdate, + }).expect("Could not send command to gateway"); + } + } } } } |