serenity/gateway/
shard.rs

1use std::sync::Arc;
2use std::time::{Duration as StdDuration, Instant};
3
4use tokio::sync::Mutex;
5use tokio_tungstenite::tungstenite::error::Error as TungsteniteError;
6use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;
7use tracing::{debug, error, info, instrument, trace, warn};
8use url::Url;
9
10use super::{
11    ActivityData,
12    ChunkGuildFilter,
13    ConnectionStage,
14    GatewayError,
15    PresenceData,
16    ReconnectType,
17    ShardAction,
18    WsClient,
19};
20use crate::constants::{self, close_codes};
21use crate::internal::prelude::*;
22use crate::model::event::{Event, GatewayEvent};
23use crate::model::gateway::{GatewayIntents, ShardInfo};
24use crate::model::id::{ApplicationId, GuildId};
25use crate::model::user::OnlineStatus;
26
27/// A Shard is a higher-level handler for a websocket connection to Discord's gateway.
28///
29/// The shard allows for sending and receiving messages over the websocket, such as setting the
30/// active activity, reconnecting, syncing guilds, and more.
31///
32/// Refer to the [module-level documentation][module docs] for information on effectively using
33/// multiple shards, if you need to.
34///
35/// Note that there are additional methods available if you are manually managing a shard yourself,
36/// although they are hidden from the documentation since there are few use cases for doing such.
37///
38/// # Stand-alone shards
39///
40/// You may instantiate a shard yourself - decoupled from the [`Client`] - if you need to. For most
41/// use cases, you will not need to do this, and you can leave the client to do it.
42///
43/// This can be done by passing in the required parameters to [`Self::new`]. You can then manually
44/// handle the shard yourself.
45///
46/// **Note**: You _really_ do not need to do this. Just call one of the appropriate methods on the
47/// [`Client`].
48///
49/// # Examples
50///
51/// See the documentation for [`Self::new`] on how to use this.
52///
53/// [`Client`]: crate::Client
54/// [`receive`]: #method.receive
55/// [docs]: https://discord.com/developers/docs/topics/gateway#sharding
56/// [module docs]: crate::gateway#sharding
57pub struct Shard {
58    pub client: WsClient,
59    presence: PresenceData,
60    last_heartbeat_sent: Option<Instant>,
61    last_heartbeat_ack: Option<Instant>,
62    heartbeat_interval: Option<std::time::Duration>,
63    application_id_callback: Option<Box<dyn FnOnce(ApplicationId) + Send + Sync>>,
64    /// This is used by the heartbeater to determine whether the last heartbeat was sent without an
65    /// acknowledgement, and whether to reconnect.
66    // This must be set to `true` in `Shard::handle_event`'s `Ok(GatewayEvent::HeartbeatAck)` arm.
67    last_heartbeat_acknowledged: bool,
68    seq: u64,
69    session_id: Option<String>,
70    shard_info: ShardInfo,
71    stage: ConnectionStage,
72    /// Instant of when the shard was started.
73    // This acts as a timeout to determine if the shard has - for some reason - not started within
74    // a decent amount of time.
75    pub started: Instant,
76    pub token: String,
77    ws_url: Arc<Mutex<String>>,
78    pub intents: GatewayIntents,
79}
80
81impl Shard {
82    /// Instantiates a new instance of a Shard, bypassing the client.
83    ///
84    /// **Note**: You should likely never need to do this yourself.
85    ///
86    /// # Examples
87    ///
88    /// Instantiating a new Shard manually for a bot with no shards, and then listening for events:
89    ///
90    /// ```rust,no_run
91    /// use std::sync::Arc;
92    ///
93    /// use serenity::gateway::Shard;
94    /// use serenity::model::gateway::{GatewayIntents, ShardInfo};
95    /// use serenity::model::id::ShardId;
96    /// use tokio::sync::Mutex;
97    /// #
98    /// # use serenity::http::Http;
99    /// #
100    /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
101    /// # let http: Arc<Http> = unimplemented!();
102    /// let token = std::env::var("DISCORD_BOT_TOKEN")?;
103    /// let shard_info = ShardInfo {
104    ///     id: ShardId(0),
105    ///     total: 1,
106    /// };
107    ///
108    /// // retrieve the gateway response, which contains the URL to connect to
109    /// let gateway = Arc::new(Mutex::new(http.get_gateway().await?.url));
110    /// let shard = Shard::new(gateway, &token, shard_info, GatewayIntents::all(), None).await?;
111    ///
112    /// // at this point, you can create a `loop`, and receive events and match
113    /// // their variants
114    /// # Ok(())
115    /// # }
116    /// ```
117    ///
118    /// # Errors
119    ///
120    /// On Error, will return either [`Error::Gateway`], [`Error::Tungstenite`] or a Rustls/native
121    /// TLS error.
122    pub async fn new(
123        ws_url: Arc<Mutex<String>>,
124        token: &str,
125        shard_info: ShardInfo,
126        intents: GatewayIntents,
127        presence: Option<PresenceData>,
128    ) -> Result<Shard> {
129        let url = ws_url.lock().await.clone();
130        let client = connect(&url).await?;
131
132        let presence = presence.unwrap_or_default();
133        let last_heartbeat_sent = None;
134        let last_heartbeat_ack = None;
135        let heartbeat_interval = None;
136        let last_heartbeat_acknowledged = true;
137        let seq = 0;
138        let stage = ConnectionStage::Handshake;
139        let session_id = None;
140
141        Ok(Shard {
142            client,
143            presence,
144            last_heartbeat_sent,
145            last_heartbeat_ack,
146            heartbeat_interval,
147            application_id_callback: None,
148            last_heartbeat_acknowledged,
149            seq,
150            stage,
151            started: Instant::now(),
152            token: token.to_string(),
153            session_id,
154            shard_info,
155            ws_url,
156            intents,
157        })
158    }
159
160    /// Sets a callback to be called when the gateway receives the application's ID from Discord.
161    ///
162    /// Used internally by serenity to set the Http's internal application ID automatically.
163    pub fn set_application_id_callback(
164        &mut self,
165        callback: impl FnOnce(ApplicationId) + Send + Sync + 'static,
166    ) {
167        self.application_id_callback = Some(Box::new(callback));
168    }
169
170    /// Retrieves the current presence of the shard.
171    #[inline]
172    pub fn presence(&self) -> &PresenceData {
173        &self.presence
174    }
175
176    /// Retrieves the value of when the last heartbeat was sent.
177    #[inline]
178    pub fn last_heartbeat_sent(&self) -> Option<Instant> {
179        self.last_heartbeat_sent
180    }
181
182    /// Retrieves the value of when the last heartbeat ack was received.
183    #[inline]
184    pub fn last_heartbeat_ack(&self) -> Option<Instant> {
185        self.last_heartbeat_ack
186    }
187
188    /// Sends a heartbeat to the gateway with the current sequence.
189    ///
190    /// This sets the last heartbeat time to now, and [`Self::last_heartbeat_acknowledged`] to
191    /// `false`.
192    ///
193    /// # Errors
194    ///
195    /// Returns [`GatewayError::HeartbeatFailed`] if there was an error sending a heartbeat.
196    #[instrument(skip(self))]
197    pub async fn heartbeat(&mut self) -> Result<()> {
198        match self.client.send_heartbeat(&self.shard_info, Some(self.seq)).await {
199            Ok(()) => {
200                self.last_heartbeat_sent = Some(Instant::now());
201                self.last_heartbeat_acknowledged = false;
202
203                Ok(())
204            },
205            Err(why) => {
206                match why {
207                    Error::Tungstenite(TungsteniteError::Io(err)) => {
208                        if err.raw_os_error() != Some(32) {
209                            debug!("[{:?}] Err heartbeating: {:?}", self.shard_info, err);
210                        }
211                    },
212                    other => {
213                        warn!("[{:?}] Other err w/ keepalive: {:?}", self.shard_info, other);
214                    },
215                }
216
217                Err(Error::Gateway(GatewayError::HeartbeatFailed))
218            },
219        }
220    }
221
222    /// Returns the heartbeat interval dictated by Discord, if the Hello packet has been received.
223    #[inline]
224    pub fn heartbeat_interval(&self) -> Option<std::time::Duration> {
225        self.heartbeat_interval
226    }
227
228    #[inline]
229    pub fn last_heartbeat_acknowledged(&self) -> bool {
230        self.last_heartbeat_acknowledged
231    }
232
233    #[inline]
234    pub fn seq(&self) -> u64 {
235        self.seq
236    }
237
238    #[inline]
239    pub fn session_id(&self) -> Option<&String> {
240        self.session_id.as_ref()
241    }
242
243    #[inline]
244    #[instrument(skip(self))]
245    pub fn set_activity(&mut self, activity: Option<ActivityData>) {
246        self.presence.activity = activity;
247    }
248
249    #[inline]
250    #[instrument(skip(self))]
251    pub fn set_presence(&mut self, activity: Option<ActivityData>, status: OnlineStatus) {
252        self.set_activity(activity);
253        self.set_status(status);
254    }
255
256    #[inline]
257    #[instrument(skip(self))]
258    pub fn set_status(&mut self, mut status: OnlineStatus) {
259        if status == OnlineStatus::Offline {
260            status = OnlineStatus::Invisible;
261        }
262
263        self.presence.status = status;
264    }
265
266    /// Retrieves a copy of the current shard information.
267    ///
268    /// For example, if using 3 shards in total, and if this is shard 1, then it can be read as
269    /// "the second of three shards".
270    pub fn shard_info(&self) -> ShardInfo {
271        self.shard_info
272    }
273
274    /// Returns the current connection stage of the shard.
275    pub fn stage(&self) -> ConnectionStage {
276        self.stage
277    }
278
279    #[instrument(skip(self))]
280    fn handle_gateway_dispatch(&mut self, seq: u64, event: &Event) -> Option<ShardAction> {
281        if seq > self.seq + 1 {
282            warn!("[{:?}] Sequence off; them: {}, us: {}", self.shard_info, seq, self.seq);
283        }
284
285        match &event {
286            Event::Ready(ready) => {
287                debug!("[{:?}] Received Ready", self.shard_info);
288
289                self.session_id = Some(ready.ready.session_id.clone());
290                self.stage = ConnectionStage::Connected;
291
292                if let Some(callback) = self.application_id_callback.take() {
293                    callback(ready.ready.application.id);
294                }
295            },
296            Event::Resumed(_) => {
297                info!("[{:?}] Resumed", self.shard_info);
298
299                self.stage = ConnectionStage::Connected;
300                self.last_heartbeat_acknowledged = true;
301                self.last_heartbeat_sent = Some(Instant::now());
302                self.last_heartbeat_ack = None;
303            },
304            _ => {},
305        }
306
307        self.seq = seq;
308
309        None
310    }
311
312    #[instrument(skip(self))]
313    fn handle_heartbeat_event(&mut self, s: u64) -> ShardAction {
314        info!("[{:?}] Received shard heartbeat", self.shard_info);
315
316        // Received seq is off -- attempt to resume.
317        if s > self.seq + 1 {
318            info!(
319                "[{:?}] Received off sequence (them: {}; us: {}); resuming",
320                self.shard_info, s, self.seq
321            );
322
323            if self.stage == ConnectionStage::Handshake {
324                self.stage = ConnectionStage::Identifying;
325
326                return ShardAction::Identify;
327            }
328            warn!("[{:?}] Heartbeat during non-Handshake; auto-reconnecting", self.shard_info);
329
330            return ShardAction::Reconnect(self.reconnection_type());
331        }
332
333        ShardAction::Heartbeat
334    }
335
336    #[instrument(skip(self))]
337    fn handle_gateway_closed(
338        &mut self,
339        data: Option<&CloseFrame<'static>>,
340    ) -> Result<Option<ShardAction>> {
341        let num = data.map(|d| d.code.into());
342        let clean = num == Some(1000);
343
344        match num {
345            Some(close_codes::UNKNOWN_OPCODE) => {
346                warn!("[{:?}] Sent invalid opcode.", self.shard_info);
347            },
348            Some(close_codes::DECODE_ERROR) => {
349                warn!("[{:?}] Sent invalid message.", self.shard_info);
350            },
351            Some(close_codes::NOT_AUTHENTICATED) => {
352                warn!("[{:?}] Sent no authentication.", self.shard_info);
353
354                return Err(Error::Gateway(GatewayError::NoAuthentication));
355            },
356            Some(close_codes::AUTHENTICATION_FAILED) => {
357                error!(
358                    "[{:?}] Sent invalid authentication, please check the token.",
359                    self.shard_info
360                );
361
362                return Err(Error::Gateway(GatewayError::InvalidAuthentication));
363            },
364            Some(close_codes::ALREADY_AUTHENTICATED) => {
365                warn!("[{:?}] Already authenticated.", self.shard_info);
366            },
367            Some(close_codes::INVALID_SEQUENCE) => {
368                warn!("[{:?}] Sent invalid seq: {}.", self.shard_info, self.seq);
369
370                self.seq = 0;
371            },
372            Some(close_codes::RATE_LIMITED) => {
373                warn!("[{:?}] Gateway ratelimited.", self.shard_info);
374            },
375            Some(close_codes::INVALID_SHARD) => {
376                warn!("[{:?}] Sent invalid shard data.", self.shard_info);
377
378                return Err(Error::Gateway(GatewayError::InvalidShardData));
379            },
380            Some(close_codes::SHARDING_REQUIRED) => {
381                error!("[{:?}] Shard has too many guilds.", self.shard_info);
382
383                return Err(Error::Gateway(GatewayError::OverloadedShard));
384            },
385            Some(4006 | close_codes::SESSION_TIMEOUT) => {
386                info!("[{:?}] Invalid session.", self.shard_info);
387
388                self.session_id = None;
389            },
390            Some(close_codes::INVALID_GATEWAY_INTENTS) => {
391                error!("[{:?}] Invalid gateway intents have been provided.", self.shard_info);
392
393                return Err(Error::Gateway(GatewayError::InvalidGatewayIntents));
394            },
395            Some(close_codes::DISALLOWED_GATEWAY_INTENTS) => {
396                error!("[{:?}] Disallowed gateway intents have been provided.", self.shard_info);
397
398                return Err(Error::Gateway(GatewayError::DisallowedGatewayIntents));
399            },
400            Some(other) if !clean => {
401                warn!(
402                    "[{:?}] Unknown unclean close {}: {:?}",
403                    self.shard_info,
404                    other,
405                    data.map(|d| &d.reason),
406                );
407            },
408            _ => {},
409        }
410
411        let resume = num
412            .map_or(true, |x| x != close_codes::AUTHENTICATION_FAILED && self.session_id.is_some());
413
414        Ok(Some(if resume {
415            ShardAction::Reconnect(ReconnectType::Resume)
416        } else {
417            ShardAction::Reconnect(ReconnectType::Reidentify)
418        }))
419    }
420
421    /// Handles an event from the gateway over the receiver, requiring the receiver to be passed if
422    /// a reconnect needs to occur.
423    ///
424    /// The best case scenario is that one of two values is returned:
425    /// - `Ok(None)`: a heartbeat, late hello, or session invalidation was received;
426    /// - `Ok(Some((event, None)))`: an op0 dispatch was received, and the shard's voice state will
427    ///   be updated, _if_ the `voice` feature is enabled.
428    ///
429    /// # Errors
430    ///
431    /// Returns a [`GatewayError::InvalidAuthentication`] if invalid authentication was sent in the
432    /// IDENTIFY.
433    ///
434    /// Returns a [`GatewayError::InvalidShardData`] if invalid shard data was sent in the
435    /// IDENTIFY.
436    ///
437    /// Returns a [`GatewayError::NoAuthentication`] if no authentication was sent in the IDENTIFY.
438    ///
439    /// Returns a [`GatewayError::OverloadedShard`] if the shard would have too many guilds
440    /// assigned to it.
441    #[instrument(skip(self))]
442    pub fn handle_event(&mut self, event: &Result<GatewayEvent>) -> Result<Option<ShardAction>> {
443        match event {
444            Ok(GatewayEvent::Dispatch(seq, event)) => Ok(self.handle_gateway_dispatch(*seq, event)),
445            Ok(GatewayEvent::Heartbeat(s)) => Ok(Some(self.handle_heartbeat_event(*s))),
446            Ok(GatewayEvent::HeartbeatAck) => {
447                self.last_heartbeat_ack = Some(Instant::now());
448                self.last_heartbeat_acknowledged = true;
449
450                trace!("[{:?}] Received heartbeat ack", self.shard_info);
451
452                Ok(None)
453            },
454            &Ok(GatewayEvent::Hello(interval)) => {
455                debug!("[{:?}] Received a Hello; interval: {}", self.shard_info, interval);
456
457                if self.stage == ConnectionStage::Resuming {
458                    return Ok(None);
459                }
460
461                self.heartbeat_interval = Some(std::time::Duration::from_millis(interval));
462
463                Ok(Some(if self.stage == ConnectionStage::Handshake {
464                    ShardAction::Identify
465                } else {
466                    debug!("[{:?}] Received late Hello; autoreconnecting", self.shard_info);
467
468                    ShardAction::Reconnect(self.reconnection_type())
469                }))
470            },
471            &Ok(GatewayEvent::InvalidateSession(resumable)) => {
472                info!("[{:?}] Received session invalidation", self.shard_info);
473
474                Ok(Some(if resumable {
475                    ShardAction::Reconnect(ReconnectType::Resume)
476                } else {
477                    ShardAction::Reconnect(ReconnectType::Reidentify)
478                }))
479            },
480            Ok(GatewayEvent::Reconnect) => Ok(Some(ShardAction::Reconnect(ReconnectType::Resume))),
481            Err(Error::Gateway(GatewayError::Closed(data))) => {
482                self.handle_gateway_closed(data.as_ref())
483            },
484            Err(Error::Tungstenite(why)) => {
485                info!("[{:?}] Websocket error: {:?}", self.shard_info, why);
486                info!("[{:?}] Will attempt to auto-reconnect", self.shard_info);
487
488                Ok(Some(ShardAction::Reconnect(self.reconnection_type())))
489            },
490            Err(why) => {
491                warn!("[{:?}] Unhandled error: {:?}", self.shard_info, why);
492
493                Ok(None)
494            },
495        }
496    }
497
498    /// Does a heartbeat if needed. Returns false if something went wrong and the shard should be
499    /// restarted.
500    ///
501    /// `true` is returned under one of the following conditions:
502    /// - the heartbeat interval has not elapsed
503    /// - a heartbeat was successfully sent
504    /// - there is no known heartbeat interval yet
505    ///
506    /// `false` is returned under one of the following conditions:
507    /// - a heartbeat acknowledgement was not received in time
508    /// - an error occurred while heartbeating
509    #[instrument(skip(self))]
510    pub async fn do_heartbeat(&mut self) -> bool {
511        let Some(heartbeat_interval) = self.heartbeat_interval else {
512            // No Hello received yet
513            return self.started.elapsed() < StdDuration::from_secs(15);
514        };
515
516        // If a duration of time less than the heartbeat_interval has passed, then don't perform a
517        // keepalive or attempt to reconnect.
518        if let Some(last_sent) = self.last_heartbeat_sent {
519            if last_sent.elapsed() <= heartbeat_interval {
520                return true;
521            }
522        }
523
524        // If the last heartbeat didn't receive an acknowledgement, then auto-reconnect.
525        if !self.last_heartbeat_acknowledged {
526            debug!("[{:?}] Last heartbeat not acknowledged", self.shard_info,);
527
528            return false;
529        }
530
531        // Otherwise, we're good to heartbeat.
532        if let Err(why) = self.heartbeat().await {
533            warn!("[{:?}] Err heartbeating: {:?}", self.shard_info, why);
534
535            false
536        } else {
537            trace!("[{:?}] Heartbeat", self.shard_info);
538
539            true
540        }
541    }
542
543    /// Calculates the heartbeat latency between the shard and the gateway.
544    // Shamelessly stolen from brayzure's commit in eris:
545    // <https://github.com/abalabahaha/eris/commit/0ce296ae9a542bcec0edf1c999ee2d9986bed5a6>
546    #[instrument(skip(self))]
547    pub fn latency(&self) -> Option<StdDuration> {
548        if let (Some(sent), Some(received)) = (self.last_heartbeat_sent, self.last_heartbeat_ack) {
549            if received > sent {
550                return Some(received - sent);
551            }
552        }
553
554        None
555    }
556
557    /// Performs a deterministic reconnect.
558    ///
559    /// The type of reconnect is deterministic on whether a [`Self::session_id`].
560    ///
561    /// If the `session_id` still exists, then a RESUME is sent. If not, then an IDENTIFY is sent.
562    ///
563    /// Note that, if the shard is already in a stage of [`ConnectionStage::Connecting`], then no
564    /// action will be performed.
565    pub fn should_reconnect(&mut self) -> Option<ReconnectType> {
566        if self.stage == ConnectionStage::Connecting {
567            return None;
568        }
569
570        Some(self.reconnection_type())
571    }
572
573    pub fn reconnection_type(&self) -> ReconnectType {
574        if self.session_id().is_some() {
575            ReconnectType::Resume
576        } else {
577            ReconnectType::Reidentify
578        }
579    }
580
581    /// Requests that one or multiple [`Guild`]s be chunked.
582    ///
583    /// This will ask the gateway to start sending member chunks for large guilds (250 members+).
584    /// If a guild is over 250 members, then a full member list will not be downloaded, and must
585    /// instead be requested to be sent in "chunks" containing members.
586    ///
587    /// Member chunks are sent as the [`Event::GuildMembersChunk`] event. Each chunk only contains
588    /// a partial amount of the total members.
589    ///
590    /// If the `cache` feature is enabled, the cache will automatically be updated with member
591    /// chunks.
592    ///
593    /// # Examples
594    ///
595    /// Chunk a single guild by Id, limiting to 2000 [`Member`]s, and not
596    /// specifying a query parameter:
597    ///
598    /// ```rust,no_run
599    /// # use tokio::sync::Mutex;
600    /// # use serenity::gateway::{ChunkGuildFilter, Shard};
601    /// # use serenity::model::gateway::{GatewayIntents, ShardInfo};
602    /// # use serenity::model::id::ShardId;
603    /// # use std::sync::Arc;
604    /// #
605    /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
606    /// #     let mutex = Arc::new(Mutex::new("".to_string()));
607    /// #     let shard_info = ShardInfo {
608    /// #          id: ShardId(0),
609    /// #          total: 1,
610    /// #     };
611    /// #
612    /// #     let mut shard = Shard::new(mutex.clone(), "", shard_info, GatewayIntents::all(), None).await?;
613    /// #
614    /// use serenity::model::id::GuildId;
615    ///
616    /// shard.chunk_guild(GuildId::new(81384788765712384), Some(2000), false, ChunkGuildFilter::None, None).await?;
617    /// # Ok(())
618    /// # }
619    /// ```
620    ///
621    /// Chunk a single guild by Id, limiting to 20 members, and specifying a query parameter of
622    /// `"do"` and a nonce of `"request"`:
623    ///
624    /// ```rust,no_run
625    /// # use tokio::sync::Mutex;
626    /// # use serenity::model::gateway::{GatewayIntents, ShardInfo};
627    /// # use serenity::gateway::{ChunkGuildFilter, Shard};
628    /// # use serenity::model::id::ShardId;
629    /// # use std::error::Error;
630    /// # use std::sync::Arc;
631    /// #
632    /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
633    /// #     let mutex = Arc::new(Mutex::new("".to_string()));
634    /// #
635    /// #     let shard_info = ShardInfo {
636    /// #          id: ShardId(0),
637    /// #          total: 1,
638    /// #     };
639    /// #     let mut shard = Shard::new(mutex.clone(), "", shard_info, GatewayIntents::all(), None).await?;
640    /// #
641    /// use serenity::model::id::GuildId;
642    ///
643    /// shard
644    ///     .chunk_guild(
645    ///         GuildId::new(81384788765712384),
646    ///         Some(20),
647    ///         false,
648    ///         ChunkGuildFilter::Query("do".to_owned()),
649    ///         Some("request"),
650    ///     )
651    ///     .await?;
652    /// # Ok(())
653    /// # }
654    /// ```
655    ///
656    /// [`Event::GuildMembersChunk`]: crate::model::event::Event::GuildMembersChunk
657    /// [`Guild`]: crate::model::guild::Guild
658    /// [`Member`]: crate::model::guild::Member
659    #[instrument(skip(self))]
660    pub async fn chunk_guild(
661        &mut self,
662        guild_id: GuildId,
663        limit: Option<u16>,
664        presences: bool,
665        filter: ChunkGuildFilter,
666        nonce: Option<&str>,
667    ) -> Result<()> {
668        debug!("[{:?}] Requesting member chunks", self.shard_info);
669
670        self.client
671            .send_chunk_guild(guild_id, &self.shard_info, limit, presences, filter, nonce)
672            .await
673    }
674
675    /// Sets the shard as going into identifying stage, which sets:
676    /// - the time that the last heartbeat sent as being now
677    /// - the `stage` to [`ConnectionStage::Identifying`]
678    #[instrument(skip(self))]
679    pub async fn identify(&mut self) -> Result<()> {
680        self.client
681            .send_identify(&self.shard_info, &self.token, self.intents, &self.presence)
682            .await?;
683
684        self.last_heartbeat_sent = Some(Instant::now());
685        self.stage = ConnectionStage::Identifying;
686
687        Ok(())
688    }
689
690    /// Initializes a new WebSocket client.
691    ///
692    /// This will set the stage of the shard before and after instantiation of the client.
693    #[instrument(skip(self))]
694    pub async fn initialize(&mut self) -> Result<WsClient> {
695        debug!("[{:?}] Initializing.", self.shard_info);
696
697        // We need to do two, sort of three things here:
698        // - set the stage of the shard as opening the websocket connection
699        // - open the websocket connection
700        // - if successful, set the current stage as Handshaking
701        //
702        // This is used to accurately assess whether the state of the shard is accurate when a
703        // Hello is received.
704        self.stage = ConnectionStage::Connecting;
705        self.started = Instant::now();
706        let url = &self.ws_url.lock().await.clone();
707        let client = connect(url).await?;
708        self.stage = ConnectionStage::Handshake;
709
710        Ok(client)
711    }
712
713    #[instrument(skip(self))]
714    pub async fn reset(&mut self) {
715        self.last_heartbeat_sent = Some(Instant::now());
716        self.last_heartbeat_ack = None;
717        self.heartbeat_interval = None;
718        self.last_heartbeat_acknowledged = true;
719        self.session_id = None;
720        self.stage = ConnectionStage::Disconnected;
721        self.seq = 0;
722    }
723
724    #[instrument(skip(self))]
725    pub async fn resume(&mut self) -> Result<()> {
726        debug!("[{:?}] Attempting to resume", self.shard_info);
727
728        self.client = self.initialize().await?;
729        self.stage = ConnectionStage::Resuming;
730
731        match &self.session_id {
732            Some(session_id) => {
733                self.client.send_resume(&self.shard_info, session_id, self.seq, &self.token).await
734            },
735            None => Err(Error::Gateway(GatewayError::NoSessionId)),
736        }
737    }
738
739    #[instrument(skip(self))]
740    pub async fn reconnect(&mut self) -> Result<()> {
741        info!("[{:?}] Attempting to reconnect", self.shard_info());
742
743        self.reset().await;
744        self.client = self.initialize().await?;
745
746        Ok(())
747    }
748
749    #[instrument(skip(self))]
750    pub async fn update_presence(&mut self) -> Result<()> {
751        self.client.send_presence_update(&self.shard_info, &self.presence).await
752    }
753}
754
755async fn connect(base_url: &str) -> Result<WsClient> {
756    let url =
757        Url::parse(&format!("{base_url}?v={}", constants::GATEWAY_VERSION)).map_err(|why| {
758            warn!("Error building gateway URL with base `{}`: {:?}", base_url, why);
759
760            Error::Gateway(GatewayError::BuildingUrl)
761        })?;
762
763    WsClient::connect(url).await
764}