serenity/gateway/shard.rs
1use std::sync::Arc;
2use std::time::{Duration as StdDuration, Instant};
3
4use tokio::sync::Mutex;
5use tokio_tungstenite::tungstenite::error::Error as TungsteniteError;
6use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;
7use tracing::{debug, error, info, instrument, trace, warn};
8use url::Url;
9
10use super::{
11 ActivityData,
12 ChunkGuildFilter,
13 ConnectionStage,
14 GatewayError,
15 PresenceData,
16 ReconnectType,
17 ShardAction,
18 WsClient,
19};
20use crate::constants::{self, close_codes};
21use crate::internal::prelude::*;
22use crate::model::event::{Event, GatewayEvent};
23use crate::model::gateway::{GatewayIntents, ShardInfo};
24use crate::model::id::{ApplicationId, GuildId};
25use crate::model::user::OnlineStatus;
26
27/// A Shard is a higher-level handler for a websocket connection to Discord's gateway.
28///
29/// The shard allows for sending and receiving messages over the websocket, such as setting the
30/// active activity, reconnecting, syncing guilds, and more.
31///
32/// Refer to the [module-level documentation][module docs] for information on effectively using
33/// multiple shards, if you need to.
34///
35/// Note that there are additional methods available if you are manually managing a shard yourself,
36/// although they are hidden from the documentation since there are few use cases for doing such.
37///
38/// # Stand-alone shards
39///
40/// You may instantiate a shard yourself - decoupled from the [`Client`] - if you need to. For most
41/// use cases, you will not need to do this, and you can leave the client to do it.
42///
43/// This can be done by passing in the required parameters to [`Self::new`]. You can then manually
44/// handle the shard yourself.
45///
46/// **Note**: You _really_ do not need to do this. Just call one of the appropriate methods on the
47/// [`Client`].
48///
49/// # Examples
50///
51/// See the documentation for [`Self::new`] on how to use this.
52///
53/// [`Client`]: crate::Client
54/// [`receive`]: #method.receive
55/// [docs]: https://discord.com/developers/docs/topics/gateway#sharding
56/// [module docs]: crate::gateway#sharding
57pub struct Shard {
58 pub client: WsClient,
59 presence: PresenceData,
60 last_heartbeat_sent: Option<Instant>,
61 last_heartbeat_ack: Option<Instant>,
62 heartbeat_interval: Option<std::time::Duration>,
63 application_id_callback: Option<Box<dyn FnOnce(ApplicationId) + Send + Sync>>,
64 /// This is used by the heartbeater to determine whether the last heartbeat was sent without an
65 /// acknowledgement, and whether to reconnect.
66 // This must be set to `true` in `Shard::handle_event`'s `Ok(GatewayEvent::HeartbeatAck)` arm.
67 last_heartbeat_acknowledged: bool,
68 seq: u64,
69 session_id: Option<String>,
70 shard_info: ShardInfo,
71 stage: ConnectionStage,
72 /// Instant of when the shard was started.
73 // This acts as a timeout to determine if the shard has - for some reason - not started within
74 // a decent amount of time.
75 pub started: Instant,
76 pub token: String,
77 ws_url: Arc<Mutex<String>>,
78 pub intents: GatewayIntents,
79}
80
81impl Shard {
82 /// Instantiates a new instance of a Shard, bypassing the client.
83 ///
84 /// **Note**: You should likely never need to do this yourself.
85 ///
86 /// # Examples
87 ///
88 /// Instantiating a new Shard manually for a bot with no shards, and then listening for events:
89 ///
90 /// ```rust,no_run
91 /// use std::sync::Arc;
92 ///
93 /// use serenity::gateway::Shard;
94 /// use serenity::model::gateway::{GatewayIntents, ShardInfo};
95 /// use serenity::model::id::ShardId;
96 /// use tokio::sync::Mutex;
97 /// #
98 /// # use serenity::http::Http;
99 /// #
100 /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
101 /// # let http: Arc<Http> = unimplemented!();
102 /// let token = std::env::var("DISCORD_BOT_TOKEN")?;
103 /// let shard_info = ShardInfo {
104 /// id: ShardId(0),
105 /// total: 1,
106 /// };
107 ///
108 /// // retrieve the gateway response, which contains the URL to connect to
109 /// let gateway = Arc::new(Mutex::new(http.get_gateway().await?.url));
110 /// let shard = Shard::new(gateway, &token, shard_info, GatewayIntents::all(), None).await?;
111 ///
112 /// // at this point, you can create a `loop`, and receive events and match
113 /// // their variants
114 /// # Ok(())
115 /// # }
116 /// ```
117 ///
118 /// # Errors
119 ///
120 /// On Error, will return either [`Error::Gateway`], [`Error::Tungstenite`] or a Rustls/native
121 /// TLS error.
122 pub async fn new(
123 ws_url: Arc<Mutex<String>>,
124 token: &str,
125 shard_info: ShardInfo,
126 intents: GatewayIntents,
127 presence: Option<PresenceData>,
128 ) -> Result<Shard> {
129 let url = ws_url.lock().await.clone();
130 let client = connect(&url).await?;
131
132 let presence = presence.unwrap_or_default();
133 let last_heartbeat_sent = None;
134 let last_heartbeat_ack = None;
135 let heartbeat_interval = None;
136 let last_heartbeat_acknowledged = true;
137 let seq = 0;
138 let stage = ConnectionStage::Handshake;
139 let session_id = None;
140
141 Ok(Shard {
142 client,
143 presence,
144 last_heartbeat_sent,
145 last_heartbeat_ack,
146 heartbeat_interval,
147 application_id_callback: None,
148 last_heartbeat_acknowledged,
149 seq,
150 stage,
151 started: Instant::now(),
152 token: token.to_string(),
153 session_id,
154 shard_info,
155 ws_url,
156 intents,
157 })
158 }
159
160 /// Sets a callback to be called when the gateway receives the application's ID from Discord.
161 ///
162 /// Used internally by serenity to set the Http's internal application ID automatically.
163 pub fn set_application_id_callback(
164 &mut self,
165 callback: impl FnOnce(ApplicationId) + Send + Sync + 'static,
166 ) {
167 self.application_id_callback = Some(Box::new(callback));
168 }
169
170 /// Retrieves the current presence of the shard.
171 #[inline]
172 pub fn presence(&self) -> &PresenceData {
173 &self.presence
174 }
175
176 /// Retrieves the value of when the last heartbeat was sent.
177 #[inline]
178 pub fn last_heartbeat_sent(&self) -> Option<Instant> {
179 self.last_heartbeat_sent
180 }
181
182 /// Retrieves the value of when the last heartbeat ack was received.
183 #[inline]
184 pub fn last_heartbeat_ack(&self) -> Option<Instant> {
185 self.last_heartbeat_ack
186 }
187
188 /// Sends a heartbeat to the gateway with the current sequence.
189 ///
190 /// This sets the last heartbeat time to now, and [`Self::last_heartbeat_acknowledged`] to
191 /// `false`.
192 ///
193 /// # Errors
194 ///
195 /// Returns [`GatewayError::HeartbeatFailed`] if there was an error sending a heartbeat.
196 #[instrument(skip(self))]
197 pub async fn heartbeat(&mut self) -> Result<()> {
198 match self.client.send_heartbeat(&self.shard_info, Some(self.seq)).await {
199 Ok(()) => {
200 self.last_heartbeat_sent = Some(Instant::now());
201 self.last_heartbeat_acknowledged = false;
202
203 Ok(())
204 },
205 Err(why) => {
206 match why {
207 Error::Tungstenite(TungsteniteError::Io(err)) => {
208 if err.raw_os_error() != Some(32) {
209 debug!("[{:?}] Err heartbeating: {:?}", self.shard_info, err);
210 }
211 },
212 other => {
213 warn!("[{:?}] Other err w/ keepalive: {:?}", self.shard_info, other);
214 },
215 }
216
217 Err(Error::Gateway(GatewayError::HeartbeatFailed))
218 },
219 }
220 }
221
222 /// Returns the heartbeat interval dictated by Discord, if the Hello packet has been received.
223 #[inline]
224 pub fn heartbeat_interval(&self) -> Option<std::time::Duration> {
225 self.heartbeat_interval
226 }
227
228 #[inline]
229 pub fn last_heartbeat_acknowledged(&self) -> bool {
230 self.last_heartbeat_acknowledged
231 }
232
233 #[inline]
234 pub fn seq(&self) -> u64 {
235 self.seq
236 }
237
238 #[inline]
239 pub fn session_id(&self) -> Option<&String> {
240 self.session_id.as_ref()
241 }
242
243 #[inline]
244 #[instrument(skip(self))]
245 pub fn set_activity(&mut self, activity: Option<ActivityData>) {
246 self.presence.activity = activity;
247 }
248
249 #[inline]
250 #[instrument(skip(self))]
251 pub fn set_presence(&mut self, activity: Option<ActivityData>, status: OnlineStatus) {
252 self.set_activity(activity);
253 self.set_status(status);
254 }
255
256 #[inline]
257 #[instrument(skip(self))]
258 pub fn set_status(&mut self, mut status: OnlineStatus) {
259 if status == OnlineStatus::Offline {
260 status = OnlineStatus::Invisible;
261 }
262
263 self.presence.status = status;
264 }
265
266 /// Retrieves a copy of the current shard information.
267 ///
268 /// For example, if using 3 shards in total, and if this is shard 1, then it can be read as
269 /// "the second of three shards".
270 pub fn shard_info(&self) -> ShardInfo {
271 self.shard_info
272 }
273
274 /// Returns the current connection stage of the shard.
275 pub fn stage(&self) -> ConnectionStage {
276 self.stage
277 }
278
279 #[instrument(skip(self))]
280 fn handle_gateway_dispatch(&mut self, seq: u64, event: &Event) -> Option<ShardAction> {
281 if seq > self.seq + 1 {
282 warn!("[{:?}] Sequence off; them: {}, us: {}", self.shard_info, seq, self.seq);
283 }
284
285 match &event {
286 Event::Ready(ready) => {
287 debug!("[{:?}] Received Ready", self.shard_info);
288
289 self.session_id = Some(ready.ready.session_id.clone());
290 self.stage = ConnectionStage::Connected;
291
292 if let Some(callback) = self.application_id_callback.take() {
293 callback(ready.ready.application.id);
294 }
295 },
296 Event::Resumed(_) => {
297 info!("[{:?}] Resumed", self.shard_info);
298
299 self.stage = ConnectionStage::Connected;
300 self.last_heartbeat_acknowledged = true;
301 self.last_heartbeat_sent = Some(Instant::now());
302 self.last_heartbeat_ack = None;
303 },
304 _ => {},
305 }
306
307 self.seq = seq;
308
309 None
310 }
311
312 #[instrument(skip(self))]
313 fn handle_heartbeat_event(&mut self, s: u64) -> ShardAction {
314 info!("[{:?}] Received shard heartbeat", self.shard_info);
315
316 // Received seq is off -- attempt to resume.
317 if s > self.seq + 1 {
318 info!(
319 "[{:?}] Received off sequence (them: {}; us: {}); resuming",
320 self.shard_info, s, self.seq
321 );
322
323 if self.stage == ConnectionStage::Handshake {
324 self.stage = ConnectionStage::Identifying;
325
326 return ShardAction::Identify;
327 }
328 warn!("[{:?}] Heartbeat during non-Handshake; auto-reconnecting", self.shard_info);
329
330 return ShardAction::Reconnect(self.reconnection_type());
331 }
332
333 ShardAction::Heartbeat
334 }
335
336 #[instrument(skip(self))]
337 fn handle_gateway_closed(
338 &mut self,
339 data: Option<&CloseFrame<'static>>,
340 ) -> Result<Option<ShardAction>> {
341 let num = data.map(|d| d.code.into());
342 let clean = num == Some(1000);
343
344 match num {
345 Some(close_codes::UNKNOWN_OPCODE) => {
346 warn!("[{:?}] Sent invalid opcode.", self.shard_info);
347 },
348 Some(close_codes::DECODE_ERROR) => {
349 warn!("[{:?}] Sent invalid message.", self.shard_info);
350 },
351 Some(close_codes::NOT_AUTHENTICATED) => {
352 warn!("[{:?}] Sent no authentication.", self.shard_info);
353
354 return Err(Error::Gateway(GatewayError::NoAuthentication));
355 },
356 Some(close_codes::AUTHENTICATION_FAILED) => {
357 error!(
358 "[{:?}] Sent invalid authentication, please check the token.",
359 self.shard_info
360 );
361
362 return Err(Error::Gateway(GatewayError::InvalidAuthentication));
363 },
364 Some(close_codes::ALREADY_AUTHENTICATED) => {
365 warn!("[{:?}] Already authenticated.", self.shard_info);
366 },
367 Some(close_codes::INVALID_SEQUENCE) => {
368 warn!("[{:?}] Sent invalid seq: {}.", self.shard_info, self.seq);
369
370 self.seq = 0;
371 },
372 Some(close_codes::RATE_LIMITED) => {
373 warn!("[{:?}] Gateway ratelimited.", self.shard_info);
374 },
375 Some(close_codes::INVALID_SHARD) => {
376 warn!("[{:?}] Sent invalid shard data.", self.shard_info);
377
378 return Err(Error::Gateway(GatewayError::InvalidShardData));
379 },
380 Some(close_codes::SHARDING_REQUIRED) => {
381 error!("[{:?}] Shard has too many guilds.", self.shard_info);
382
383 return Err(Error::Gateway(GatewayError::OverloadedShard));
384 },
385 Some(4006 | close_codes::SESSION_TIMEOUT) => {
386 info!("[{:?}] Invalid session.", self.shard_info);
387
388 self.session_id = None;
389 },
390 Some(close_codes::INVALID_GATEWAY_INTENTS) => {
391 error!("[{:?}] Invalid gateway intents have been provided.", self.shard_info);
392
393 return Err(Error::Gateway(GatewayError::InvalidGatewayIntents));
394 },
395 Some(close_codes::DISALLOWED_GATEWAY_INTENTS) => {
396 error!("[{:?}] Disallowed gateway intents have been provided.", self.shard_info);
397
398 return Err(Error::Gateway(GatewayError::DisallowedGatewayIntents));
399 },
400 Some(other) if !clean => {
401 warn!(
402 "[{:?}] Unknown unclean close {}: {:?}",
403 self.shard_info,
404 other,
405 data.map(|d| &d.reason),
406 );
407 },
408 _ => {},
409 }
410
411 let resume = num
412 .map_or(true, |x| x != close_codes::AUTHENTICATION_FAILED && self.session_id.is_some());
413
414 Ok(Some(if resume {
415 ShardAction::Reconnect(ReconnectType::Resume)
416 } else {
417 ShardAction::Reconnect(ReconnectType::Reidentify)
418 }))
419 }
420
421 /// Handles an event from the gateway over the receiver, requiring the receiver to be passed if
422 /// a reconnect needs to occur.
423 ///
424 /// The best case scenario is that one of two values is returned:
425 /// - `Ok(None)`: a heartbeat, late hello, or session invalidation was received;
426 /// - `Ok(Some((event, None)))`: an op0 dispatch was received, and the shard's voice state will
427 /// be updated, _if_ the `voice` feature is enabled.
428 ///
429 /// # Errors
430 ///
431 /// Returns a [`GatewayError::InvalidAuthentication`] if invalid authentication was sent in the
432 /// IDENTIFY.
433 ///
434 /// Returns a [`GatewayError::InvalidShardData`] if invalid shard data was sent in the
435 /// IDENTIFY.
436 ///
437 /// Returns a [`GatewayError::NoAuthentication`] if no authentication was sent in the IDENTIFY.
438 ///
439 /// Returns a [`GatewayError::OverloadedShard`] if the shard would have too many guilds
440 /// assigned to it.
441 #[instrument(skip(self))]
442 pub fn handle_event(&mut self, event: &Result<GatewayEvent>) -> Result<Option<ShardAction>> {
443 match event {
444 Ok(GatewayEvent::Dispatch(seq, event)) => Ok(self.handle_gateway_dispatch(*seq, event)),
445 Ok(GatewayEvent::Heartbeat(s)) => Ok(Some(self.handle_heartbeat_event(*s))),
446 Ok(GatewayEvent::HeartbeatAck) => {
447 self.last_heartbeat_ack = Some(Instant::now());
448 self.last_heartbeat_acknowledged = true;
449
450 trace!("[{:?}] Received heartbeat ack", self.shard_info);
451
452 Ok(None)
453 },
454 &Ok(GatewayEvent::Hello(interval)) => {
455 debug!("[{:?}] Received a Hello; interval: {}", self.shard_info, interval);
456
457 if self.stage == ConnectionStage::Resuming {
458 return Ok(None);
459 }
460
461 self.heartbeat_interval = Some(std::time::Duration::from_millis(interval));
462
463 Ok(Some(if self.stage == ConnectionStage::Handshake {
464 ShardAction::Identify
465 } else {
466 debug!("[{:?}] Received late Hello; autoreconnecting", self.shard_info);
467
468 ShardAction::Reconnect(self.reconnection_type())
469 }))
470 },
471 &Ok(GatewayEvent::InvalidateSession(resumable)) => {
472 info!("[{:?}] Received session invalidation", self.shard_info);
473
474 Ok(Some(if resumable {
475 ShardAction::Reconnect(ReconnectType::Resume)
476 } else {
477 ShardAction::Reconnect(ReconnectType::Reidentify)
478 }))
479 },
480 Ok(GatewayEvent::Reconnect) => Ok(Some(ShardAction::Reconnect(ReconnectType::Resume))),
481 Err(Error::Gateway(GatewayError::Closed(data))) => {
482 self.handle_gateway_closed(data.as_ref())
483 },
484 Err(Error::Tungstenite(why)) => {
485 info!("[{:?}] Websocket error: {:?}", self.shard_info, why);
486 info!("[{:?}] Will attempt to auto-reconnect", self.shard_info);
487
488 Ok(Some(ShardAction::Reconnect(self.reconnection_type())))
489 },
490 Err(why) => {
491 warn!("[{:?}] Unhandled error: {:?}", self.shard_info, why);
492
493 Ok(None)
494 },
495 }
496 }
497
498 /// Does a heartbeat if needed. Returns false if something went wrong and the shard should be
499 /// restarted.
500 ///
501 /// `true` is returned under one of the following conditions:
502 /// - the heartbeat interval has not elapsed
503 /// - a heartbeat was successfully sent
504 /// - there is no known heartbeat interval yet
505 ///
506 /// `false` is returned under one of the following conditions:
507 /// - a heartbeat acknowledgement was not received in time
508 /// - an error occurred while heartbeating
509 #[instrument(skip(self))]
510 pub async fn do_heartbeat(&mut self) -> bool {
511 let Some(heartbeat_interval) = self.heartbeat_interval else {
512 // No Hello received yet
513 return self.started.elapsed() < StdDuration::from_secs(15);
514 };
515
516 // If a duration of time less than the heartbeat_interval has passed, then don't perform a
517 // keepalive or attempt to reconnect.
518 if let Some(last_sent) = self.last_heartbeat_sent {
519 if last_sent.elapsed() <= heartbeat_interval {
520 return true;
521 }
522 }
523
524 // If the last heartbeat didn't receive an acknowledgement, then auto-reconnect.
525 if !self.last_heartbeat_acknowledged {
526 debug!("[{:?}] Last heartbeat not acknowledged", self.shard_info,);
527
528 return false;
529 }
530
531 // Otherwise, we're good to heartbeat.
532 if let Err(why) = self.heartbeat().await {
533 warn!("[{:?}] Err heartbeating: {:?}", self.shard_info, why);
534
535 false
536 } else {
537 trace!("[{:?}] Heartbeat", self.shard_info);
538
539 true
540 }
541 }
542
543 /// Calculates the heartbeat latency between the shard and the gateway.
544 // Shamelessly stolen from brayzure's commit in eris:
545 // <https://github.com/abalabahaha/eris/commit/0ce296ae9a542bcec0edf1c999ee2d9986bed5a6>
546 #[instrument(skip(self))]
547 pub fn latency(&self) -> Option<StdDuration> {
548 if let (Some(sent), Some(received)) = (self.last_heartbeat_sent, self.last_heartbeat_ack) {
549 if received > sent {
550 return Some(received - sent);
551 }
552 }
553
554 None
555 }
556
557 /// Performs a deterministic reconnect.
558 ///
559 /// The type of reconnect is deterministic on whether a [`Self::session_id`].
560 ///
561 /// If the `session_id` still exists, then a RESUME is sent. If not, then an IDENTIFY is sent.
562 ///
563 /// Note that, if the shard is already in a stage of [`ConnectionStage::Connecting`], then no
564 /// action will be performed.
565 pub fn should_reconnect(&mut self) -> Option<ReconnectType> {
566 if self.stage == ConnectionStage::Connecting {
567 return None;
568 }
569
570 Some(self.reconnection_type())
571 }
572
573 pub fn reconnection_type(&self) -> ReconnectType {
574 if self.session_id().is_some() {
575 ReconnectType::Resume
576 } else {
577 ReconnectType::Reidentify
578 }
579 }
580
581 /// Requests that one or multiple [`Guild`]s be chunked.
582 ///
583 /// This will ask the gateway to start sending member chunks for large guilds (250 members+).
584 /// If a guild is over 250 members, then a full member list will not be downloaded, and must
585 /// instead be requested to be sent in "chunks" containing members.
586 ///
587 /// Member chunks are sent as the [`Event::GuildMembersChunk`] event. Each chunk only contains
588 /// a partial amount of the total members.
589 ///
590 /// If the `cache` feature is enabled, the cache will automatically be updated with member
591 /// chunks.
592 ///
593 /// # Examples
594 ///
595 /// Chunk a single guild by Id, limiting to 2000 [`Member`]s, and not
596 /// specifying a query parameter:
597 ///
598 /// ```rust,no_run
599 /// # use tokio::sync::Mutex;
600 /// # use serenity::gateway::{ChunkGuildFilter, Shard};
601 /// # use serenity::model::gateway::{GatewayIntents, ShardInfo};
602 /// # use serenity::model::id::ShardId;
603 /// # use std::sync::Arc;
604 /// #
605 /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
606 /// # let mutex = Arc::new(Mutex::new("".to_string()));
607 /// # let shard_info = ShardInfo {
608 /// # id: ShardId(0),
609 /// # total: 1,
610 /// # };
611 /// #
612 /// # let mut shard = Shard::new(mutex.clone(), "", shard_info, GatewayIntents::all(), None).await?;
613 /// #
614 /// use serenity::model::id::GuildId;
615 ///
616 /// shard.chunk_guild(GuildId::new(81384788765712384), Some(2000), false, ChunkGuildFilter::None, None).await?;
617 /// # Ok(())
618 /// # }
619 /// ```
620 ///
621 /// Chunk a single guild by Id, limiting to 20 members, and specifying a query parameter of
622 /// `"do"` and a nonce of `"request"`:
623 ///
624 /// ```rust,no_run
625 /// # use tokio::sync::Mutex;
626 /// # use serenity::model::gateway::{GatewayIntents, ShardInfo};
627 /// # use serenity::gateway::{ChunkGuildFilter, Shard};
628 /// # use serenity::model::id::ShardId;
629 /// # use std::error::Error;
630 /// # use std::sync::Arc;
631 /// #
632 /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
633 /// # let mutex = Arc::new(Mutex::new("".to_string()));
634 /// #
635 /// # let shard_info = ShardInfo {
636 /// # id: ShardId(0),
637 /// # total: 1,
638 /// # };
639 /// # let mut shard = Shard::new(mutex.clone(), "", shard_info, GatewayIntents::all(), None).await?;
640 /// #
641 /// use serenity::model::id::GuildId;
642 ///
643 /// shard
644 /// .chunk_guild(
645 /// GuildId::new(81384788765712384),
646 /// Some(20),
647 /// false,
648 /// ChunkGuildFilter::Query("do".to_owned()),
649 /// Some("request"),
650 /// )
651 /// .await?;
652 /// # Ok(())
653 /// # }
654 /// ```
655 ///
656 /// [`Event::GuildMembersChunk`]: crate::model::event::Event::GuildMembersChunk
657 /// [`Guild`]: crate::model::guild::Guild
658 /// [`Member`]: crate::model::guild::Member
659 #[instrument(skip(self))]
660 pub async fn chunk_guild(
661 &mut self,
662 guild_id: GuildId,
663 limit: Option<u16>,
664 presences: bool,
665 filter: ChunkGuildFilter,
666 nonce: Option<&str>,
667 ) -> Result<()> {
668 debug!("[{:?}] Requesting member chunks", self.shard_info);
669
670 self.client
671 .send_chunk_guild(guild_id, &self.shard_info, limit, presences, filter, nonce)
672 .await
673 }
674
675 /// Sets the shard as going into identifying stage, which sets:
676 /// - the time that the last heartbeat sent as being now
677 /// - the `stage` to [`ConnectionStage::Identifying`]
678 #[instrument(skip(self))]
679 pub async fn identify(&mut self) -> Result<()> {
680 self.client
681 .send_identify(&self.shard_info, &self.token, self.intents, &self.presence)
682 .await?;
683
684 self.last_heartbeat_sent = Some(Instant::now());
685 self.stage = ConnectionStage::Identifying;
686
687 Ok(())
688 }
689
690 /// Initializes a new WebSocket client.
691 ///
692 /// This will set the stage of the shard before and after instantiation of the client.
693 #[instrument(skip(self))]
694 pub async fn initialize(&mut self) -> Result<WsClient> {
695 debug!("[{:?}] Initializing.", self.shard_info);
696
697 // We need to do two, sort of three things here:
698 // - set the stage of the shard as opening the websocket connection
699 // - open the websocket connection
700 // - if successful, set the current stage as Handshaking
701 //
702 // This is used to accurately assess whether the state of the shard is accurate when a
703 // Hello is received.
704 self.stage = ConnectionStage::Connecting;
705 self.started = Instant::now();
706 let url = &self.ws_url.lock().await.clone();
707 let client = connect(url).await?;
708 self.stage = ConnectionStage::Handshake;
709
710 Ok(client)
711 }
712
713 #[instrument(skip(self))]
714 pub async fn reset(&mut self) {
715 self.last_heartbeat_sent = Some(Instant::now());
716 self.last_heartbeat_ack = None;
717 self.heartbeat_interval = None;
718 self.last_heartbeat_acknowledged = true;
719 self.session_id = None;
720 self.stage = ConnectionStage::Disconnected;
721 self.seq = 0;
722 }
723
724 #[instrument(skip(self))]
725 pub async fn resume(&mut self) -> Result<()> {
726 debug!("[{:?}] Attempting to resume", self.shard_info);
727
728 self.client = self.initialize().await?;
729 self.stage = ConnectionStage::Resuming;
730
731 match &self.session_id {
732 Some(session_id) => {
733 self.client.send_resume(&self.shard_info, session_id, self.seq, &self.token).await
734 },
735 None => Err(Error::Gateway(GatewayError::NoSessionId)),
736 }
737 }
738
739 #[instrument(skip(self))]
740 pub async fn reconnect(&mut self) -> Result<()> {
741 info!("[{:?}] Attempting to reconnect", self.shard_info());
742
743 self.reset().await;
744 self.client = self.initialize().await?;
745
746 Ok(())
747 }
748
749 #[instrument(skip(self))]
750 pub async fn update_presence(&mut self) -> Result<()> {
751 self.client.send_presence_update(&self.shard_info, &self.presence).await
752 }
753}
754
755async fn connect(base_url: &str) -> Result<WsClient> {
756 let url =
757 Url::parse(&format!("{base_url}?v={}", constants::GATEWAY_VERSION)).map_err(|why| {
758 warn!("Error building gateway URL with base `{}`: {:?}", base_url, why);
759
760 Error::Gateway(GatewayError::BuildingUrl)
761 })?;
762
763 WsClient::connect(url).await
764}