diff options
Diffstat (limited to 'src/system')
-rw-r--r-- | src/system/aggregator.rs | 73 | ||||
-rw-r--r-- | src/system/bot.rs | 317 | ||||
-rw-r--r-- | src/system/bot/client.rs | 187 | ||||
-rw-r--r-- | src/system/bot/gateway.rs | 123 | ||||
-rw-r--r-- | src/system/bot/mod.rs | 80 | ||||
-rw-r--r-- | src/system/mod.rs | 31 | ||||
-rw-r--r-- | src/system/types.rs | 13 |
7 files changed, 469 insertions, 355 deletions
diff --git a/src/system/aggregator.rs b/src/system/aggregator.rs index 0177249..6873272 100644 --- a/src/system/aggregator.rs +++ b/src/system/aggregator.rs @@ -1,14 +1,15 @@ use lru::LruCache; use std::num::NonZeroUsize; use tokio::sync::mpsc::{channel, Receiver, Sender}; +use twilight_model::channel::Message as TwiMessage; -use super::{MessageEvent, MessageId, SystemEvent}; +use super::{Message as GatewayMessage, MessageEvent, MessageId, SystemEvent}; pub struct MessageAggregator { rx: Receiver<MessageEvent>, tx: Sender<MessageEvent>, - message_cache: lru::LruCache<MessageId, MessageEvent>, - emitter: Option<Sender<SystemEvent>>, + message_cache: lru::LruCache<MessageId, TwiMessage>, + system_emitter: Option<Sender<SystemEvent>>, } impl MessageAggregator { @@ -19,7 +20,7 @@ impl MessageAggregator { tx, rx, message_cache: LruCache::new(NonZeroUsize::new(100).unwrap()), - emitter: None, + system_emitter: None, } } @@ -27,31 +28,61 @@ impl MessageAggregator { self.tx.clone() } - pub fn set_handler(&mut self, emitter: Sender<SystemEvent>) -> () { - self.emitter = Some(emitter); + pub fn set_system_handler(&mut self, emitter: Sender<SystemEvent>) -> () { + self.system_emitter = Some(emitter); } pub fn start(mut self) -> () { tokio::spawn(async move { loop { match self.rx.recv().await { - None => return, + None => (), Some((timestamp, message)) => { - let last_seen_timestamp = self.message_cache.get(&message.id); - let current_timestamp = timestamp; - - if last_seen_timestamp.is_none() - || last_seen_timestamp.unwrap().0.as_micros() - < current_timestamp.as_micros() - { - self.message_cache - .put(message.id, (timestamp, message.clone())); - - if let Some(emitter) = &self.emitter { - emitter - .send(SystemEvent::NewMessage((timestamp, message))) + let system_emitter = &self.system_emitter.clone().expect("No system emitter"); + match message { + GatewayMessage::Partial(current_partial, member_id) => { + match self.message_cache.get(¤t_partial.id) { + Some(original_message) => { + + let mut updated_message = original_message.clone(); + if let Some(edited_time) = current_partial.edited_timestamp { + updated_message.edited_timestamp = Some(edited_time); + } + + if let Some(content) = current_partial.content { + updated_message.content = content + } + + self.tx.send((timestamp, GatewayMessage::Complete(updated_message))).await; + }, + None => { + system_emitter.send( + SystemEvent::RefetchMessage(member_id, current_partial.id, current_partial.channel_id) + ).await; + }, + }; + }, + GatewayMessage::Complete(message) => { + let previous_message = self.message_cache.get(&message.id); + + if let Some(previous_message) = previous_message.cloned() { + let previous_timestamp = previous_message.edited_timestamp.unwrap_or(previous_message.timestamp); + let current_timestamp = message.edited_timestamp.unwrap_or(message.timestamp); + + // Should we skip sending + if previous_timestamp.as_micros() >= current_timestamp.as_micros() { + continue + } + + // If not, fall through to update stored message + } + + self.message_cache.put(message.id, message.clone()); + + self.system_emitter.as_ref().expect("Aggregator has no system emitter") + .send(SystemEvent::NewMessage(timestamp, message)) .await; - } + }, }; } } diff --git a/src/system/bot.rs b/src/system/bot.rs deleted file mode 100644 index 8014b60..0000000 --- a/src/system/bot.rs +++ /dev/null @@ -1,317 +0,0 @@ -use std::sync::Arc; -use tokio::sync::mpsc::Sender; -use tokio::sync::Mutex; -use futures::future::join_all; -use twilight_gateway::{Intents, Shard, ShardId}; -use twilight_http::{request::channel::reaction::RequestReactionType, Client}; -use twilight_http::error::Error as TwError; -use twilight_model::gateway::{ - payload::outgoing::{update_presence::UpdatePresencePayload, UpdatePresence}, - OpCode, -}; -use twilight_model::{ - channel, - channel::{ - message::{AllowedMentions, MentionType, MessageType}, - Message, - }, - id::{marker::UserMarker, Id}, -}; -use twilight_model::http::attachment::Attachment; - -use super::{MemberId, MessageEvent, Status, SystemEvent, UserId, MessageId, ChannelId}; - -#[derive(Clone)] -pub struct Bot { - member_id: MemberId, - reference_user_id: UserId, - last_presence: Arc<Mutex<Status>>, - message_handler: Option<Sender<MessageEvent>>, - system_handler: Option<Sender<SystemEvent>>, - shard: Arc<Mutex<Shard>>, - client: Arc<Mutex<Client>>, -} - -impl Bot { - pub fn new( - member_id: MemberId, - config: &crate::config::Member, - reference_user_id: UserId, - ) -> Self { - let intents = Intents::GUILD_MEMBERS - | Intents::GUILD_PRESENCES - | Intents::GUILD_MESSAGES - | Intents::MESSAGE_CONTENT; - - Self { - member_id, - reference_user_id, - last_presence: Arc::new(Mutex::new(Status::Online)), - message_handler: None, - system_handler: None, - shard: Arc::new(Mutex::new(Shard::new( - ShardId::ONE, - config.discord_token.clone(), - intents, - ))), - client: Arc::new(Mutex::new(Client::new(config.discord_token.clone()))), - } - } - - pub fn set_message_handler(&mut self, handler: Sender<MessageEvent>) { - self.message_handler = Some(handler); - } - - pub fn set_system_handler(&mut self, handler: Sender<SystemEvent>) { - self.system_handler = Some(handler); - } - - pub async fn set_status(&self, status: Status) { - let mut last_status = self.last_presence.lock().await; - - if status == *last_status { - return - } - - let mut shard = self.shard.lock().await; - - shard - .command(&UpdatePresence { - d: UpdatePresencePayload { - activities: Vec::new(), - afk: false, - since: None, - status, - }, - op: OpCode::PresenceUpdate, - }) - .await - .expect("Could not send command to gateway"); - - *last_status = status; - } - - pub async fn delete_message(&self, channel_id: ChannelId, message_id: MessageId) -> Result<(), TwError> { - let client = self.client.lock().await; - let delete_result = client.delete_message(channel_id, message_id).await; - - match delete_result { - Err(err) => { - match &err.kind() { - twilight_http::error::ErrorType::Response { body: _, error, status: _ } => match error { - twilight_http::api_error::ApiError::General(err) => { - // Code for "Missing Permissions": https://discord.com/developers/docs/topics/opcodes-and-status-codes#json-json-error-codes - if err.code == 50013 { - println!("ERROR: Client {} doesn't have permissions to delete message", self.member_id); - let _ = client.create_reaction( - channel_id, - message_id, - &RequestReactionType::Unicode { name: "🔐" } - ).await; - } - }, - _ => (), - }, - _ => (), - }; - - Err(err) - }, - _ => Ok(()), - } - } - - pub async fn duplicate_message(&self, message: &Message, content: &str) -> Result<channel::Message, MessageDuplicateError> { - let client = self.client.lock().await; - - let mut create_message = client.create_message(message.channel_id).content(content)?; - - let mut allowed_mentions = AllowedMentions { - parse: Vec::new(), - replied_user: false, - roles: message.mention_roles.clone(), - users: message.mentions.iter().map(|user| user.id).collect(), - }; - - if message.mention_everyone { - allowed_mentions.parse.push(MentionType::Everyone); - } - - if message.kind == MessageType::Reply { - if let Some(ref_message) = message.referenced_message.as_ref() { - create_message = create_message.reply(ref_message.id); - - let pings_referenced_author = message - .mentions - .iter() - .any(|user| user.id == ref_message.author.id); - - if pings_referenced_author { - allowed_mentions.replied_user = true; - } else { - allowed_mentions.replied_user = false; - } - } else { - panic!("Cannot proxy message: Was reply but no referenced message"); - } - } - - let attachments = join_all(message.attachments.iter().map(|attachment| async { - let filename = attachment.filename.clone(); - let description_opt = attachment.description.clone(); - let bytes = reqwest::get(attachment.proxy_url.clone()) - .await? - .bytes() - .await?; - let mut new_attachment = - Attachment::from_bytes(filename, bytes.try_into().unwrap(), attachment.id.into()); - - if let Some(description) = description_opt { - new_attachment.description(description); - } - - Ok(new_attachment) - })) - .await - .iter() - .filter_map( - |result: &Result<Attachment, MessageDuplicateError>| match result { - Ok(attachment) => Some(attachment.clone()), - Err(_) => None, - }, - ) - .collect::<Vec<_>>(); - - if attachments.len() > 0 { - create_message = create_message.attachments(attachments.as_slice())?; - } - - if let Some(flags) = message.flags { - create_message = create_message.flags(flags); - } - - create_message = create_message.allowed_mentions(Some(&allowed_mentions)); - let new_message = create_message.await?.model().await?; - - Ok(new_message) - } - - pub fn start_listening(&self) { - self.clone().start_listen_task() - } - - fn start_listen_task(self) { - tokio::spawn(async move { - loop { - let next_event = { self.shard.lock().await.next_event().await }; - - match next_event { - Err(source) => { - if let Some(channel) = &self.system_handler { - channel - .send(SystemEvent::GatewayError(self.member_id, source.to_string())) - .await; - - if source.is_fatal() { - channel.send(SystemEvent::GatewayClosed(self.member_id)).await; - break; - } - } - } - Ok(event) => match event { - twilight_gateway::Event::Ready(_) => { - if let Some(channel) = &self.system_handler { - channel - .send(SystemEvent::GatewayConnected(self.member_id)) - .await; - } - } - - twilight_gateway::Event::MessageCreate(message_create) => { - let message = message_create.0; - - if message.author.id != self.reference_user_id { - continue; - } - - if let Some(channel) = &self.message_handler { - channel - .send((message.timestamp, message)) - .await; - } - } - - twilight_gateway::Event::MessageUpdate(message_update) => { - if message_update.author.is_none() - || message_update.author.as_ref().unwrap().id != self.reference_user_id - { - continue; - } - - if message_update.edited_timestamp.is_none() { - println!("Message update but no edit timestamp"); - continue; - } - - if message_update.content.is_none() { - println!("Message update but no content"); - continue; - } - - let message = self.client - .lock() - .await - .message(message_update.channel_id, message_update.id) - .await - .expect("Could not load message") - .model() - .await - .expect("Could not deserialize message"); - - if let Some(channel) = &self.message_handler { - channel - .send((message_update.edited_timestamp.unwrap(), message)) - .await; - } - } - - _ => (), - }, - } - } - }); - } -} - - -#[derive(Debug)] -pub enum MessageDuplicateError { - MessageValidation(twilight_validate::message::MessageValidationError), - AttachmentRequest(reqwest::Error), - MessageCreate(twilight_http::error::Error), - ResponseDeserialization(twilight_http::response::DeserializeBodyError), -} - -impl From<twilight_validate::message::MessageValidationError> for MessageDuplicateError { - fn from(value: twilight_validate::message::MessageValidationError) -> Self { - MessageDuplicateError::MessageValidation(value) - } -} - -impl From<reqwest::Error> for MessageDuplicateError { - fn from(value: reqwest::Error) -> Self { - MessageDuplicateError::AttachmentRequest(value) - } -} - -impl From<twilight_http::error::Error> for MessageDuplicateError { - fn from(value: twilight_http::error::Error) -> Self { - MessageDuplicateError::MessageCreate(value) - } -} - -impl From<twilight_http::response::DeserializeBodyError> for MessageDuplicateError { - fn from(value: twilight_http::response::DeserializeBodyError) -> Self { - MessageDuplicateError::ResponseDeserialization(value) - } -} diff --git a/src/system/bot/client.rs b/src/system/bot/client.rs new file mode 100644 index 0000000..c55759a --- /dev/null +++ b/src/system/bot/client.rs @@ -0,0 +1,187 @@ +use std::sync::Arc; +use futures::future::join_all; +use tokio::sync::RwLock; +use tokio::sync::Mutex; +use twilight_http::client::Client as TwiClient; +use twilight_http::error::Error as TwiError; +use twilight_http::request::channel::reaction::RequestReactionType; +use twilight_model::channel::message::{AllowedMentions, MentionType, MessageType}; +use twilight_model::http::attachment::Attachment; + +use super::*; + +pub struct Client { + client: Arc<Mutex<TwiClient>>, + bot_conf: Arc<RwLock<BotConfig>>, +} + +impl Client { + pub fn new(discord_token: &String, bot_conf: &Arc<RwLock<BotConfig>>) -> Self { + Self { + client: Arc::new(Mutex::new(TwiClient::new(discord_token.clone()))), + bot_conf: bot_conf.clone(), + } + } + + pub async fn refetch_message(&self, message_id: MessageId, channel_id: ChannelId) { + let client = self.client.lock().await; + let bot_conf = self.bot_conf.read().await; + let message_channel = bot_conf.message_handler.as_ref().expect("No message handler"); + + let message = client + .message(channel_id, message_id) + .await + .expect("Could not load message") + .model() + .await + .expect("Could not deserialize message"); + + let timestamp = if message.edited_timestamp.is_some() { + message.edited_timestamp.unwrap() + } else { + message.timestamp + }; + + message_channel + .send((timestamp, Message::Complete(message))) + .await; + } + + pub async fn delete_message(&self, channel_id: ChannelId, message_id: MessageId) -> Result<(), TwiError> { + let client = self.client.lock().await; + let delete_result = client.delete_message(channel_id, message_id).await; + let member_id = self.bot_conf.read().await.member_id; + + match delete_result { + Err(err) => { + match &err.kind() { + twilight_http::error::ErrorType::Response { body: _, error, status: _ } => match error { + twilight_http::api_error::ApiError::General(err) => { + // Code for "Missing Permissions": https://discord.com/developers/docs/topics/opcodes-and-status-codes#json-json-error-codes + if err.code == 50013 { + println!("ERROR: Client {} doesn't have permissions to delete message", member_id); + let _ = client.create_reaction( + channel_id, + message_id, + &RequestReactionType::Unicode { name: "🔐" } + ).await; + } + }, + _ => (), + }, + _ => (), + }; + + Err(err) + }, + _ => Ok(()), + } + } + + pub async fn duplicate_message(&self, message: &TwiMessage, content: &str) -> Result<TwiMessage, MessageDuplicateError> { + let client = self.client.lock().await; + + let mut create_message = client.create_message(message.channel_id).content(content)?; + + let mut allowed_mentions = AllowedMentions { + parse: Vec::new(), + replied_user: false, + roles: message.mention_roles.clone(), + users: message.mentions.iter().map(|user| user.id).collect(), + }; + + if message.mention_everyone { + allowed_mentions.parse.push(MentionType::Everyone); + } + + if message.kind == MessageType::Reply { + if let Some(ref_message) = message.referenced_message.as_ref() { + create_message = create_message.reply(ref_message.id); + + let pings_referenced_author = message + .mentions + .iter() + .any(|user| user.id == ref_message.author.id); + + if pings_referenced_author { + allowed_mentions.replied_user = true; + } else { + allowed_mentions.replied_user = false; + } + } else { + panic!("Cannot proxy message: Was reply but no referenced message"); + } + } + + let attachments = join_all(message.attachments.iter().map(|attachment| async { + let filename = attachment.filename.clone(); + let description_opt = attachment.description.clone(); + let bytes = reqwest::get(attachment.proxy_url.clone()) + .await? + .bytes() + .await?; + let mut new_attachment = + Attachment::from_bytes(filename, bytes.try_into().unwrap(), attachment.id.into()); + + if let Some(description) = description_opt { + new_attachment.description(description); + } + + Ok(new_attachment) + })) + .await + .iter() + .filter_map( + |result: &Result<Attachment, MessageDuplicateError>| match result { + Ok(attachment) => Some(attachment.clone()), + Err(_) => None, + }, + ) + .collect::<Vec<_>>(); + + if attachments.len() > 0 { + create_message = create_message.attachments(attachments.as_slice())?; + } + + if let Some(flags) = message.flags { + create_message = create_message.flags(flags); + } + + create_message = create_message.allowed_mentions(Some(&allowed_mentions)); + let new_message = create_message.await?.model().await?; + + Ok(new_message) + } +} + +#[derive(Debug)] +pub enum MessageDuplicateError { + MessageValidation(twilight_validate::message::MessageValidationError), + AttachmentRequest(reqwest::Error), + MessageCreate(twilight_http::error::Error), + ResponseDeserialization(twilight_http::response::DeserializeBodyError), +} + +impl From<twilight_validate::message::MessageValidationError> for MessageDuplicateError { + fn from(value: twilight_validate::message::MessageValidationError) -> Self { + MessageDuplicateError::MessageValidation(value) + } +} + +impl From<reqwest::Error> for MessageDuplicateError { + fn from(value: reqwest::Error) -> Self { + MessageDuplicateError::AttachmentRequest(value) + } +} + +impl From<twilight_http::error::Error> for MessageDuplicateError { + fn from(value: twilight_http::error::Error) -> Self { + MessageDuplicateError::MessageCreate(value) + } +} + +impl From<twilight_http::response::DeserializeBodyError> for MessageDuplicateError { + fn from(value: twilight_http::response::DeserializeBodyError) -> Self { + MessageDuplicateError::ResponseDeserialization(value) + } +} diff --git a/src/system/bot/gateway.rs b/src/system/bot/gateway.rs new file mode 100644 index 0000000..4a83086 --- /dev/null +++ b/src/system/bot/gateway.rs @@ -0,0 +1,123 @@ +use std::sync::Arc; +use tokio::sync::RwLock; +use tokio::sync::Mutex; +use twilight_model::gateway::OpCode; +use twilight_model::gateway::payload::outgoing::{update_presence::UpdatePresencePayload, UpdatePresence}; +use twilight_gateway::{ + Intents, Shard, ShardId, +}; + +use super::{Message, Status, SystemEvent, BotConfig}; + +pub struct Gateway { + shard: Arc<Mutex<Shard>>, + bot_conf: Arc<RwLock<BotConfig>>, +} + +impl Gateway { + pub fn new(discord_token: &String, bot_conf: &Arc<RwLock<BotConfig>>) -> Self { + let intents = Intents::GUILD_MEMBERS + | Intents::GUILD_PRESENCES + | Intents::GUILD_MESSAGES + | Intents::MESSAGE_CONTENT; + + Self { + shard: Arc::new(Mutex::new(Shard::new( + ShardId::ONE, + discord_token.clone(), + intents, + ))), + bot_conf: bot_conf.clone(), + } + } + + pub async fn set_status(&self, status: Status) { + { + let last_status = { (*self.bot_conf.read().await).last_status }; + + if status == last_status { + return + } + } + + + { + let mut shard = self.shard.lock().await; + + shard.command(&UpdatePresence { + d: UpdatePresencePayload { + activities: Vec::new(), + afk: false, + since: None, + status, + }, + op: OpCode::PresenceUpdate, + }).await.expect("Could not send command to gateway"); + } + + self.bot_conf.write().await.last_status = status; + } + + pub fn start_listening(&self) { + let bot_conf = self.bot_conf.clone(); + let shard = self.shard.clone(); + tokio::spawn(async move { + loop { + let bot_conf = { (*bot_conf.read().await).clone() }; + let next_event = { shard.lock().await.next_event().await }; + let system_channel = bot_conf.system_handler.as_ref().expect("No system channel"); + let message_channel = bot_conf.message_handler.as_ref().expect("No message channel"); + + match next_event { + Err(source) => { + system_channel + .send(SystemEvent::GatewayError(bot_conf.member_id, source.to_string())) + .await; + + if source.is_fatal() { + system_channel.send(SystemEvent::GatewayClosed(bot_conf.member_id)).await; + break; + } + } + Ok(event) => match event { + twilight_gateway::Event::Ready(_) => { + system_channel + .send(SystemEvent::GatewayConnected(bot_conf.member_id)) + .await; + } + + twilight_gateway::Event::MessageCreate(message_create) => { + let message = message_create.0; + + if message.author.id != bot_conf.reference_user_id { + continue; + } + + message_channel + .send((message.timestamp, Message::Complete(message))) + .await; + } + + twilight_gateway::Event::MessageUpdate(message_update) => { + if message_update.author.is_none() + || message_update.author.as_ref().unwrap().id != bot_conf.reference_user_id + { + continue; + } + + if message_update.edited_timestamp.is_none() || message_update.content.is_none() { + continue; + } + + message_channel + .send((message_update.edited_timestamp.unwrap(), Message::Partial(*message_update, bot_conf.member_id))) + .await; + } + + _ => (), + }, + }; + } + }); + } +} diff --git a/src/system/bot/mod.rs b/src/system/bot/mod.rs new file mode 100644 index 0000000..6bf8d78 --- /dev/null +++ b/src/system/bot/mod.rs @@ -0,0 +1,80 @@ +mod client; +mod gateway; + +use std::sync::Arc; +use tokio::sync::mpsc::Sender; +use tokio::sync::RwLock; +use twilight_http::error::Error as TwiError; + +pub use super::types::*; +pub use client::MessageDuplicateError; +use gateway::Gateway; +use client::Client; + +#[derive(Clone)] +pub struct BotConfig { + pub member_id: MemberId, + pub reference_user_id: UserId, + pub discord_token: String, + pub last_status: Status, + pub message_handler: Option<Sender<MessageEvent>>, + pub system_handler: Option<Sender<SystemEvent>>, +} + +pub struct Bot { + bot_conf: Arc<RwLock<BotConfig>>, + gateway: Gateway, + client: Client, +} + +impl Bot { + pub fn new( + member_id: MemberId, + config: &crate::config::Member, + reference_user_id: UserId, + ) -> Self { + let bot_conf = Arc::new(RwLock::new(BotConfig { + member_id, + reference_user_id, + discord_token: config.discord_token.clone(), + last_status: Status::Online, + message_handler: None, + system_handler: None, + })); + + Self { + gateway: Gateway::new(&config.discord_token, &bot_conf), + client: Client::new(&config.discord_token, &bot_conf), + bot_conf, + } + } + + pub async fn set_message_handler(&mut self, handler: Sender<MessageEvent>) { + self.bot_conf.write().await.message_handler = Some(handler); + } + + pub async fn set_system_handler(&mut self, handler: Sender<SystemEvent>) { + self.bot_conf.write().await.system_handler = Some(handler); + } + + pub async fn set_status(&self, status: Status) { + self.gateway.set_status(status).await; + } + + pub fn start(&self) { + self.gateway.start_listening() + } + + pub async fn refetch_message(&self, message_id: MessageId, channel_id: ChannelId) { + self.client.refetch_message(message_id, channel_id).await; + } + + pub async fn delete_message(&self, channel_id: ChannelId, message_id: MessageId) -> Result<(), TwiError> { + self.client.delete_message(channel_id, message_id).await + } + + pub async fn duplicate_message(&self, message_id: &TwiMessage, content: &str) -> Result<TwiMessage, MessageDuplicateError> { + self.client.duplicate_message(message_id, content).await + } +} + diff --git a/src/system/mod.rs b/src/system/mod.rs index 4179e3b..bd390b7 100644 --- a/src/system/mod.rs +++ b/src/system/mod.rs @@ -4,12 +4,7 @@ use tokio::{ sync::mpsc::{channel, Sender}, time::sleep, }; -use twilight_model::{ - channel::{ - Message, - }, - id::{marker::UserMarker, Id}, -}; +use twilight_model::id::{marker::UserMarker, Id}; use twilight_model::util::Timestamp; use crate::config::{AutoproxyConfig, AutoproxyLatchScope, Member}; @@ -21,7 +16,6 @@ use aggregator::MessageAggregator; use bot::Bot; pub use types::*; -use self::bot::MessageDuplicateError; pub struct Manager { pub name: String, @@ -72,17 +66,17 @@ impl Manager { let (system_sender, mut system_receiver) = channel::<SystemEvent>(100); self.system_sender = Some(system_sender.clone()); let mut aggregator = MessageAggregator::new(); - aggregator.set_handler(system_sender.clone()); + aggregator.set_system_handler(system_sender.clone()); for (member_id, member) in self.config.members.iter().enumerate() { // Create gateway listener let mut bot = Bot::new(member_id, &member, reference_user_id); - bot.set_message_handler(aggregator.get_sender()); - bot.set_system_handler(system_sender.clone()); + bot.set_message_handler(aggregator.get_sender()).await; + bot.set_system_handler(system_sender.clone()).await; // Start gateway listener - bot.start_listening(); + bot.start(); self.bots.insert(member_id, bot); } @@ -122,9 +116,13 @@ impl Manager { num_connected -= 1; } - SystemEvent::NewMessage((event_time, message)) => { + SystemEvent::NewMessage(event_time, message) => { self.handle_message(message, event_time).await; } + SystemEvent::RefetchMessage(member_id, message_id, channel_id) => { + let bot = self.bots.get(&member_id).expect("No bot"); + bot.refetch_message(message_id, channel_id).await; + } SystemEvent::GatewayError(member_id, message) => { let member = self .find_member_by_id(member_id) @@ -155,7 +153,7 @@ impl Manager { } } - async fn handle_message(&mut self, message: Message, timestamp: Timestamp) { + async fn handle_message(&mut self, message: TwiMessage, timestamp: Timestamp) { // TODO: Commands if message.content.eq("!panic") { panic!("Exiting due to user command"); @@ -234,12 +232,13 @@ impl Manager { } } - async fn proxy_message(&self, message: &Message, member: MemberId, content: &str) -> Result<(), ()> { + async fn proxy_message(&self, message: &TwiMessage, member: MemberId, content: &str) -> Result<(), ()> { let bot = self.bots.get(&member).expect("No client for member"); let duplicate_result = bot.duplicate_message(message, content).await; if duplicate_result.is_err() { + println!("Could not copy message: {:?}", duplicate_result); return Err(()) } @@ -247,6 +246,8 @@ impl Manager { let delete_result = bot.delete_message(message.channel_id, message.id).await; if delete_result.is_err() { + println!("Could not delete message: {:?}", delete_result); + // Delete the duplicated message if that failed let _ = bot.delete_message(message.channel_id, duplicate_result.unwrap().id).await; return Err(()) @@ -339,7 +340,7 @@ impl Manager { } impl crate::config::Member { - pub fn matches_proxy_prefix<'a>(&self, message: &'a Message) -> Option<&'a str> { + pub fn matches_proxy_prefix<'a>(&self, message: &'a TwiMessage) -> Option<&'a str> { match self.message_pattern.captures(message.content.as_str()) { None => None, Some(captures) => match captures.name("content") { diff --git a/src/system/types.rs b/src/system/types.rs index e8d14a6..41cb10d 100644 --- a/src/system/types.rs +++ b/src/system/types.rs @@ -1,4 +1,5 @@ -use twilight_model::channel::Message; +pub use twilight_model::channel::Message as TwiMessage; +use twilight_model::gateway::payload::incoming::MessageUpdate as PartialMessage; use twilight_model::id::marker::{ChannelMarker, MessageMarker, UserMarker}; use twilight_model::id::Id; use twilight_model::util::Timestamp; @@ -7,9 +8,16 @@ pub type MemberId = usize; pub type MessageId = Id<MessageMarker>; pub type ChannelId = Id<ChannelMarker>; pub type UserId = Id<UserMarker>; +pub type FullMessage = TwiMessage; pub type Status = twilight_model::gateway::presence::Status; +#[derive(Clone)] +pub enum Message { + Complete(FullMessage), + Partial(PartialMessage, MemberId), +} + pub type MessageEvent = (Timestamp, Message); pub type ReactionEvent = (Timestamp, ()); pub type CommandEvent = (Timestamp, ()); @@ -20,9 +28,10 @@ pub enum SystemEvent { GatewayError(MemberId, String), GatewayClosed(MemberId), AllGatewaysConnected, + RefetchMessage(MemberId, MessageId, ChannelId), // User event handling - NewMessage(MessageEvent), + NewMessage(Timestamp, FullMessage), EditedMessage(MessageEvent), NewReaction(ReactionEvent), |