Skip to main content

songbird/tracks/
queue.rs

1use crate::{
2    driver::Driver,
3    events::{Event, EventContext, EventData, EventHandler, TrackEvent},
4    input::Input,
5    tracks::{Track, TrackHandle, TrackResult},
6};
7use async_trait::async_trait;
8use parking_lot::Mutex;
9use std::{collections::VecDeque, ops::Deref, sync::Arc, time::Duration};
10use tracing::{info, warn};
11
12/// A simple queue for several audio sources, designed to
13/// play in sequence.
14///
15/// This makes use of [`TrackEvent`]s to determine when the current
16/// song or audio file has finished before playing the next entry.
17///
18/// One of these is automatically included via [`Driver::queue`] when
19/// the `"builtin-queue"` feature is enabled.
20///
21/// `examples/serenity/voice_events_queue` demonstrates how a user might manage,
22/// track and use this to run a song queue in many guilds in parallel.
23/// This code is trivial to extend if extra functionality is needed.
24///
25/// # Example
26///
27/// ```rust,no_run
28/// use songbird::{
29///     driver::Driver,
30///     id::GuildId,
31///     input::File,
32///     tracks::TrackQueue,
33/// };
34/// use std::collections::HashMap;
35/// use std::num::NonZeroU64;
36///
37/// # async {
38/// let guild = GuildId(NonZeroU64::new(1).unwrap());
39/// // A Call is also valid here!
40/// let mut driver: Driver = Default::default();
41///
42/// let mut queues: HashMap<GuildId, TrackQueue> = Default::default();
43///
44/// let source = File::new("../audio/my-favourite-song.mp3");
45///
46/// // We need to ensure that this guild has a TrackQueue created for it.
47/// let queue = queues.entry(guild)
48///     .or_default();
49///
50/// // Queueing a track is this easy!
51/// queue.add_source(source.into(), &mut driver);
52/// # };
53/// ```
54///
55/// [`TrackEvent`]: crate::events::TrackEvent
56/// [`Driver::queue`]: crate::driver::Driver
57#[derive(Clone, Debug, Default)]
58pub struct TrackQueue {
59    // NOTE: the choice of a parking lot mutex is quite deliberate
60    inner: Arc<Mutex<TrackQueueCore>>,
61}
62
63/// Reference to a track which is known to be part of a queue.
64///
65/// Instances *should not* be moved from one queue to another.
66#[derive(Debug)]
67pub struct Queued(TrackHandle);
68
69impl Deref for Queued {
70    type Target = TrackHandle;
71
72    fn deref(&self) -> &Self::Target {
73        &self.0
74    }
75}
76
77impl Queued {
78    /// Clones the inner handle
79    #[must_use]
80    pub fn handle(&self) -> TrackHandle {
81        self.0.clone()
82    }
83}
84
85#[derive(Debug, Default)]
86/// Inner portion of a [`TrackQueue`].
87///
88/// This abstracts away thread-safety from the user,
89/// and offers a convenient location to store further state if required.
90///
91/// [`TrackQueue`]: TrackQueue
92struct TrackQueueCore {
93    tracks: VecDeque<Queued>,
94}
95
96struct QueueHandler {
97    remote_lock: Arc<Mutex<TrackQueueCore>>,
98}
99
100#[async_trait]
101impl EventHandler for QueueHandler {
102    async fn act(&self, ctx: &EventContext<'_>) -> Option<Event> {
103        let mut inner = self.remote_lock.lock();
104
105        // Due to possibility that users might remove, reorder,
106        // or dequeue+stop tracks, we need to verify that the FIRST
107        // track is the one who has ended.
108        match ctx {
109            EventContext::Track(ts) => {
110                // This slice should have exactly one entry.
111                // If the ended track has same id as the queue head, then
112                // we can progress the queue.
113                if inner.tracks.front()?.uuid() != ts.first()?.1.uuid() {
114                    return None;
115                }
116            },
117            _ => return None,
118        }
119
120        let _old = inner.tracks.pop_front();
121
122        info!("Queued track ended: {:?}.", ctx);
123        info!("{} tracks remain.", inner.tracks.len());
124
125        // Keep going until we find one track which works, or we run out.
126        while let Some(new) = inner.tracks.front() {
127            if new.play().is_err() {
128                // Discard files which cannot be used for whatever reason.
129                warn!("Track in Queue couldn't be played...");
130                inner.tracks.pop_front();
131            } else {
132                break;
133            }
134        }
135
136        None
137    }
138}
139
140struct SongPreloader {
141    remote_lock: Arc<Mutex<TrackQueueCore>>,
142}
143
144#[async_trait]
145impl EventHandler for SongPreloader {
146    async fn act(&self, _ctx: &EventContext<'_>) -> Option<Event> {
147        let inner = self.remote_lock.lock();
148
149        if let Some(track) = inner.tracks.get(1) {
150            // This is the sync-version so that we can fire and ignore
151            // the request ASAP.
152            drop(track.0.make_playable());
153        }
154
155        None
156    }
157}
158
159impl TrackQueue {
160    /// Create a new, empty, track queue.
161    #[must_use]
162    pub fn new() -> Self {
163        Self {
164            inner: Arc::new(Mutex::new(TrackQueueCore {
165                tracks: VecDeque::new(),
166            })),
167        }
168    }
169
170    /// Adds an audio source to the queue, to be played in the channel managed by `driver`.
171    ///
172    /// This method will preload the next track 5 seconds before the current track ends, if
173    /// the [`AuxMetadata`] can be successfully queried for a [`Duration`].
174    ///
175    /// [`AuxMetadata`]: crate::input::AuxMetadata
176    pub async fn add_source(&self, input: Input, driver: &mut Driver) -> TrackHandle {
177        self.add(input.into(), driver).await
178    }
179
180    /// Adds a [`Track`] object to the queue, to be played in the channel managed by `driver`.
181    ///
182    /// This allows additional configuration or event handlers to be added
183    /// before enqueueing the audio track. [`Track`]s will be paused pre-emptively.
184    ///
185    /// This method will preload the next track 5 seconds before the current track ends, if
186    /// the [`AuxMetadata`] can be successfully queried for a [`Duration`].
187    ///
188    /// [`AuxMetadata`]: crate::input::AuxMetadata
189    pub async fn add(&self, mut track: Track, driver: &mut Driver) -> TrackHandle {
190        let preload_time = Self::get_preload_time(&mut track).await;
191        self.add_with_preload(track, driver, preload_time)
192    }
193
194    pub(crate) async fn get_preload_time(track: &mut Track) -> Option<Duration> {
195        let meta = match track.input {
196            Input::Lazy(ref mut rec) | Input::Live(_, Some(ref mut rec)) =>
197                rec.aux_metadata().await.ok(),
198            Input::Live(_, None) => None,
199        };
200
201        meta.and_then(|meta| meta.duration)
202            .map(|d| d.saturating_sub(Duration::from_secs(5)))
203    }
204
205    /// Add an existing [`Track`] to the queue, using a known time to preload the next track.
206    ///
207    /// `preload_time` can be specified to enable gapless playback: this is the
208    /// playback position *in this track* when the the driver will begin to load the next track.
209    /// The standard [`Self::add`] method use [`AuxMetadata`] to set this to 5 seconds before
210    /// a track ends.
211    ///
212    /// A `None` value will not ready the next track until this track ends, disabling preload.
213    ///
214    /// [`AuxMetadata`]: crate::input::AuxMetadata
215    #[inline]
216    pub fn add_with_preload(
217        &self,
218        mut track: Track,
219        driver: &mut Driver,
220        preload_time: Option<Duration>,
221    ) -> TrackHandle {
222        // Attempts to start loading the next track before this one ends.
223        // Idea is to provide as close to gapless playback as possible,
224        // while minimising memory use.
225        info!("Track added to queue.");
226
227        let remote_lock = self.inner.clone();
228        track.events.add_event(
229            EventData::new(Event::Track(TrackEvent::End), QueueHandler { remote_lock }),
230            Duration::ZERO,
231        );
232
233        if let Some(time) = preload_time {
234            let remote_lock = self.inner.clone();
235            track.events.add_event(
236                EventData::new(Event::Delayed(time), SongPreloader { remote_lock }),
237                Duration::ZERO,
238            );
239        }
240
241        let (should_play, handle) = {
242            let mut inner = self.inner.lock();
243
244            let handle = driver.play(track.pause());
245            inner.tracks.push_back(Queued(handle.clone()));
246
247            (inner.tracks.len() == 1, handle)
248        };
249
250        if should_play {
251            drop(handle.play());
252        }
253
254        handle
255    }
256
257    /// Returns a handle to the currently playing track.
258    #[must_use]
259    pub fn current(&self) -> Option<TrackHandle> {
260        let inner = self.inner.lock();
261
262        inner.tracks.front().map(Queued::handle)
263    }
264
265    /// Attempts to remove a track from the specified index.
266    ///
267    /// The returned entry can be readded to *this* queue via [`modify_queue`].
268    ///
269    /// [`modify_queue`]: TrackQueue::modify_queue
270    #[must_use]
271    pub fn dequeue(&self, index: usize) -> Option<Queued> {
272        self.modify_queue(|vq| vq.remove(index))
273    }
274
275    /// Returns the number of tracks currently in the queue.
276    #[must_use]
277    pub fn len(&self) -> usize {
278        let inner = self.inner.lock();
279
280        inner.tracks.len()
281    }
282
283    /// Returns whether there are no tracks currently in the queue.
284    #[must_use]
285    pub fn is_empty(&self) -> bool {
286        let inner = self.inner.lock();
287
288        inner.tracks.is_empty()
289    }
290
291    /// Allows modification of the inner queue (i.e., deletion, reordering).
292    ///
293    /// Users must be careful to `stop` removed tracks, so as to prevent
294    /// resource leaks.
295    pub fn modify_queue<F, O>(&self, func: F) -> O
296    where
297        F: FnOnce(&mut VecDeque<Queued>) -> O,
298    {
299        let mut inner = self.inner.lock();
300        func(&mut inner.tracks)
301    }
302
303    /// Pause the track at the head of the queue.
304    pub fn pause(&self) -> TrackResult<()> {
305        let inner = self.inner.lock();
306
307        if let Some(handle) = inner.tracks.front() {
308            handle.pause()
309        } else {
310            Ok(())
311        }
312    }
313
314    /// Resume the track at the head of the queue.
315    pub fn resume(&self) -> TrackResult<()> {
316        let inner = self.inner.lock();
317
318        if let Some(handle) = inner.tracks.front() {
319            handle.play()
320        } else {
321            Ok(())
322        }
323    }
324
325    /// Stop the currently playing track, and clears the queue.
326    pub fn stop(&self) {
327        let mut inner = self.inner.lock();
328
329        for track in inner.tracks.drain(..) {
330            // Errors when removing tracks don't really make
331            // a difference: an error just implies it's already gone.
332            drop(track.stop());
333        }
334    }
335
336    /// Skip to the next track in the queue, if it exists.
337    pub fn skip(&self) -> TrackResult<()> {
338        let inner = self.inner.lock();
339
340        inner.stop_current()
341    }
342
343    /// Returns a list of currently queued tracks.
344    ///
345    /// Does not allow for modification of the queue, instead returns a snapshot of the queue at the time of calling.
346    ///
347    /// Use [`modify_queue`] for direct modification of the queue.
348    ///
349    /// [`modify_queue`]: TrackQueue::modify_queue
350    #[must_use]
351    pub fn current_queue(&self) -> Vec<TrackHandle> {
352        let inner = self.inner.lock();
353
354        inner.tracks.iter().map(Queued::handle).collect()
355    }
356}
357
358impl TrackQueueCore {
359    /// Skip to the next track in the queue, if it exists.
360    fn stop_current(&self) -> TrackResult<()> {
361        if let Some(handle) = self.tracks.front() {
362            handle.stop()
363        } else {
364            Ok(())
365        }
366    }
367}
368
369#[cfg(all(test, feature = "builtin-queue"))]
370mod tests {
371    use crate::{
372        driver::Driver,
373        input::{File, HttpRequest},
374        tracks::PlayMode,
375        Config,
376    };
377    use reqwest::Client;
378    use std::time::Duration;
379
380    #[tokio::test]
381    #[ntest::timeout(20_000)]
382    async fn next_track_plays_on_end() {
383        let (t_handle, config) = Config::test_cfg(true);
384        let mut driver = Driver::new(config.clone());
385
386        let file1 = File::new("resources/ting.wav");
387        let file2 = file1.clone();
388
389        let h1 = driver.enqueue_input(file1.into()).await;
390        let h2 = driver.enqueue_input(file2.into()).await;
391
392        // Get h1 in place, playing. Wait for IO to ready.
393        // Fast wait here since it's all local I/O, no network.
394        t_handle
395            .ready_track(&h1, Some(Duration::from_millis(1)))
396            .await;
397        t_handle
398            .ready_track(&h2, Some(Duration::from_millis(1)))
399            .await;
400
401        // playout
402        t_handle.tick(1);
403        t_handle.wait(1);
404
405        let h1a = h1.get_info();
406        let h2a = h2.get_info();
407
408        // allow get_info to fire for h2.
409        t_handle.tick(2);
410
411        // post-conditions:
412        // 1) track 1 is done & dropped (commands fail).
413        // 2) track 2 is playing.
414        assert!(h1a.await.is_err());
415        assert_eq!(h2a.await.unwrap().playing, PlayMode::Play);
416    }
417
418    #[tokio::test]
419    #[ntest::timeout(15_000)]
420    async fn next_track_plays_on_skip() {
421        let (t_handle, config) = Config::test_cfg(true);
422        let mut driver = Driver::new(config.clone());
423
424        let file1 = File::new("resources/ting.wav");
425        let file2 = file1.clone();
426
427        let h1 = driver.enqueue_input(file1.into()).await;
428        let h2 = driver.enqueue_input(file2.into()).await;
429
430        // Get h1 in place, playing. Wait for IO to ready.
431        // Fast wait here since it's all local I/O, no network.
432        t_handle
433            .ready_track(&h1, Some(Duration::from_millis(1)))
434            .await;
435
436        assert!(driver.queue().skip().is_ok());
437
438        t_handle
439            .ready_track(&h2, Some(Duration::from_millis(1)))
440            .await;
441
442        // playout
443        t_handle.skip(1).await;
444
445        let h1a = h1.get_info();
446        let h2a = h2.get_info();
447
448        // allow get_info to fire for h2.
449        t_handle.tick(2);
450
451        // post-conditions:
452        // 1) track 1 is done & dropped (commands fail).
453        // 2) track 2 is playing.
454        assert!(h1a.await.is_err());
455        assert_eq!(h2a.await.unwrap().playing, PlayMode::Play);
456    }
457
458    #[tokio::test]
459    #[ntest::timeout(15_000)]
460    async fn next_track_plays_on_err() {
461        let (t_handle, config) = Config::test_cfg(true);
462        let mut driver = Driver::new(config.clone());
463
464        // File 1 is HTML with no valid audio -- this will fail to play.
465        let file1 = HttpRequest::new(
466            Client::new(),
467            "http://github.com/serenity-rs/songbird/".into(),
468        );
469        let file2 = File::new("resources/ting.wav");
470
471        let h1 = driver.enqueue_input(file1.into()).await;
472        let h2 = driver.enqueue_input(file2.into()).await;
473
474        // Get h1 in place, playing. Wait for IO to ready.
475        // Fast wait here since it's all local I/O, no network.
476        // t_handle
477        //     .ready_track(&h1, Some(Duration::from_millis(1)))
478        //     .await;
479        t_handle
480            .ready_track(&h2, Some(Duration::from_millis(1)))
481            .await;
482
483        // playout
484        t_handle.tick(1);
485        t_handle.wait(1);
486
487        let h1a = h1.get_info();
488        let h2a = h2.get_info();
489
490        // allow get_info to fire for h2.
491        t_handle.tick(2);
492
493        // post-conditions:
494        // 1) track 1 is done & dropped (commands fail).
495        // 2) track 2 is playing.
496        assert!(h1a.await.is_err());
497        assert_eq!(h2a.await.unwrap().playing, PlayMode::Play);
498    }
499}