songbird/tracks/queue.rs
1use crate::{
2 driver::Driver,
3 events::{Event, EventContext, EventData, EventHandler, TrackEvent},
4 input::Input,
5 tracks::{Track, TrackHandle, TrackResult},
6};
7use async_trait::async_trait;
8use parking_lot::Mutex;
9use std::{collections::VecDeque, ops::Deref, sync::Arc, time::Duration};
10use tracing::{info, warn};
11
12/// A simple queue for several audio sources, designed to
13/// play in sequence.
14///
15/// This makes use of [`TrackEvent`]s to determine when the current
16/// song or audio file has finished before playing the next entry.
17///
18/// One of these is automatically included via [`Driver::queue`] when
19/// the `"builtin-queue"` feature is enabled.
20///
21/// `examples/serenity/voice_events_queue` demonstrates how a user might manage,
22/// track and use this to run a song queue in many guilds in parallel.
23/// This code is trivial to extend if extra functionality is needed.
24///
25/// # Example
26///
27/// ```rust,no_run
28/// use songbird::{
29/// driver::Driver,
30/// id::GuildId,
31/// input::File,
32/// tracks::TrackQueue,
33/// };
34/// use std::collections::HashMap;
35/// use std::num::NonZeroU64;
36///
37/// # async {
38/// let guild = GuildId(NonZeroU64::new(1).unwrap());
39/// // A Call is also valid here!
40/// let mut driver: Driver = Default::default();
41///
42/// let mut queues: HashMap<GuildId, TrackQueue> = Default::default();
43///
44/// let source = File::new("../audio/my-favourite-song.mp3");
45///
46/// // We need to ensure that this guild has a TrackQueue created for it.
47/// let queue = queues.entry(guild)
48/// .or_default();
49///
50/// // Queueing a track is this easy!
51/// queue.add_source(source.into(), &mut driver);
52/// # };
53/// ```
54///
55/// [`TrackEvent`]: crate::events::TrackEvent
56/// [`Driver::queue`]: crate::driver::Driver
57#[derive(Clone, Debug, Default)]
58pub struct TrackQueue {
59 // NOTE: the choice of a parking lot mutex is quite deliberate
60 inner: Arc<Mutex<TrackQueueCore>>,
61}
62
63/// Reference to a track which is known to be part of a queue.
64///
65/// Instances *should not* be moved from one queue to another.
66#[derive(Debug)]
67pub struct Queued(TrackHandle);
68
69impl Deref for Queued {
70 type Target = TrackHandle;
71
72 fn deref(&self) -> &Self::Target {
73 &self.0
74 }
75}
76
77impl Queued {
78 /// Clones the inner handle
79 #[must_use]
80 pub fn handle(&self) -> TrackHandle {
81 self.0.clone()
82 }
83}
84
85#[derive(Debug, Default)]
86/// Inner portion of a [`TrackQueue`].
87///
88/// This abstracts away thread-safety from the user,
89/// and offers a convenient location to store further state if required.
90///
91/// [`TrackQueue`]: TrackQueue
92struct TrackQueueCore {
93 tracks: VecDeque<Queued>,
94}
95
96struct QueueHandler {
97 remote_lock: Arc<Mutex<TrackQueueCore>>,
98}
99
100#[async_trait]
101impl EventHandler for QueueHandler {
102 async fn act(&self, ctx: &EventContext<'_>) -> Option<Event> {
103 let mut inner = self.remote_lock.lock();
104
105 // Due to possibility that users might remove, reorder,
106 // or dequeue+stop tracks, we need to verify that the FIRST
107 // track is the one who has ended.
108 match ctx {
109 EventContext::Track(ts) => {
110 // This slice should have exactly one entry.
111 // If the ended track has same id as the queue head, then
112 // we can progress the queue.
113 if inner.tracks.front()?.uuid() != ts.first()?.1.uuid() {
114 return None;
115 }
116 },
117 _ => return None,
118 }
119
120 let _old = inner.tracks.pop_front();
121
122 info!("Queued track ended: {:?}.", ctx);
123 info!("{} tracks remain.", inner.tracks.len());
124
125 // Keep going until we find one track which works, or we run out.
126 while let Some(new) = inner.tracks.front() {
127 if new.play().is_err() {
128 // Discard files which cannot be used for whatever reason.
129 warn!("Track in Queue couldn't be played...");
130 inner.tracks.pop_front();
131 } else {
132 break;
133 }
134 }
135
136 None
137 }
138}
139
140struct SongPreloader {
141 remote_lock: Arc<Mutex<TrackQueueCore>>,
142}
143
144#[async_trait]
145impl EventHandler for SongPreloader {
146 async fn act(&self, _ctx: &EventContext<'_>) -> Option<Event> {
147 let inner = self.remote_lock.lock();
148
149 if let Some(track) = inner.tracks.get(1) {
150 // This is the sync-version so that we can fire and ignore
151 // the request ASAP.
152 drop(track.0.make_playable());
153 }
154
155 None
156 }
157}
158
159impl TrackQueue {
160 /// Create a new, empty, track queue.
161 #[must_use]
162 pub fn new() -> Self {
163 Self {
164 inner: Arc::new(Mutex::new(TrackQueueCore {
165 tracks: VecDeque::new(),
166 })),
167 }
168 }
169
170 /// Adds an audio source to the queue, to be played in the channel managed by `driver`.
171 ///
172 /// This method will preload the next track 5 seconds before the current track ends, if
173 /// the [`AuxMetadata`] can be successfully queried for a [`Duration`].
174 ///
175 /// [`AuxMetadata`]: crate::input::AuxMetadata
176 pub async fn add_source(&self, input: Input, driver: &mut Driver) -> TrackHandle {
177 self.add(input.into(), driver).await
178 }
179
180 /// Adds a [`Track`] object to the queue, to be played in the channel managed by `driver`.
181 ///
182 /// This allows additional configuration or event handlers to be added
183 /// before enqueueing the audio track. [`Track`]s will be paused pre-emptively.
184 ///
185 /// This method will preload the next track 5 seconds before the current track ends, if
186 /// the [`AuxMetadata`] can be successfully queried for a [`Duration`].
187 ///
188 /// [`AuxMetadata`]: crate::input::AuxMetadata
189 pub async fn add(&self, mut track: Track, driver: &mut Driver) -> TrackHandle {
190 let preload_time = Self::get_preload_time(&mut track).await;
191 self.add_with_preload(track, driver, preload_time)
192 }
193
194 pub(crate) async fn get_preload_time(track: &mut Track) -> Option<Duration> {
195 let meta = match track.input {
196 Input::Lazy(ref mut rec) | Input::Live(_, Some(ref mut rec)) =>
197 rec.aux_metadata().await.ok(),
198 Input::Live(_, None) => None,
199 };
200
201 meta.and_then(|meta| meta.duration)
202 .map(|d| d.saturating_sub(Duration::from_secs(5)))
203 }
204
205 /// Add an existing [`Track`] to the queue, using a known time to preload the next track.
206 ///
207 /// `preload_time` can be specified to enable gapless playback: this is the
208 /// playback position *in this track* when the the driver will begin to load the next track.
209 /// The standard [`Self::add`] method use [`AuxMetadata`] to set this to 5 seconds before
210 /// a track ends.
211 ///
212 /// A `None` value will not ready the next track until this track ends, disabling preload.
213 ///
214 /// [`AuxMetadata`]: crate::input::AuxMetadata
215 #[inline]
216 pub fn add_with_preload(
217 &self,
218 mut track: Track,
219 driver: &mut Driver,
220 preload_time: Option<Duration>,
221 ) -> TrackHandle {
222 // Attempts to start loading the next track before this one ends.
223 // Idea is to provide as close to gapless playback as possible,
224 // while minimising memory use.
225 info!("Track added to queue.");
226
227 let remote_lock = self.inner.clone();
228 track.events.add_event(
229 EventData::new(Event::Track(TrackEvent::End), QueueHandler { remote_lock }),
230 Duration::ZERO,
231 );
232
233 if let Some(time) = preload_time {
234 let remote_lock = self.inner.clone();
235 track.events.add_event(
236 EventData::new(Event::Delayed(time), SongPreloader { remote_lock }),
237 Duration::ZERO,
238 );
239 }
240
241 let (should_play, handle) = {
242 let mut inner = self.inner.lock();
243
244 let handle = driver.play(track.pause());
245 inner.tracks.push_back(Queued(handle.clone()));
246
247 (inner.tracks.len() == 1, handle)
248 };
249
250 if should_play {
251 drop(handle.play());
252 }
253
254 handle
255 }
256
257 /// Returns a handle to the currently playing track.
258 #[must_use]
259 pub fn current(&self) -> Option<TrackHandle> {
260 let inner = self.inner.lock();
261
262 inner.tracks.front().map(Queued::handle)
263 }
264
265 /// Attempts to remove a track from the specified index.
266 ///
267 /// The returned entry can be readded to *this* queue via [`modify_queue`].
268 ///
269 /// [`modify_queue`]: TrackQueue::modify_queue
270 #[must_use]
271 pub fn dequeue(&self, index: usize) -> Option<Queued> {
272 self.modify_queue(|vq| vq.remove(index))
273 }
274
275 /// Returns the number of tracks currently in the queue.
276 #[must_use]
277 pub fn len(&self) -> usize {
278 let inner = self.inner.lock();
279
280 inner.tracks.len()
281 }
282
283 /// Returns whether there are no tracks currently in the queue.
284 #[must_use]
285 pub fn is_empty(&self) -> bool {
286 let inner = self.inner.lock();
287
288 inner.tracks.is_empty()
289 }
290
291 /// Allows modification of the inner queue (i.e., deletion, reordering).
292 ///
293 /// Users must be careful to `stop` removed tracks, so as to prevent
294 /// resource leaks.
295 pub fn modify_queue<F, O>(&self, func: F) -> O
296 where
297 F: FnOnce(&mut VecDeque<Queued>) -> O,
298 {
299 let mut inner = self.inner.lock();
300 func(&mut inner.tracks)
301 }
302
303 /// Pause the track at the head of the queue.
304 pub fn pause(&self) -> TrackResult<()> {
305 let inner = self.inner.lock();
306
307 if let Some(handle) = inner.tracks.front() {
308 handle.pause()
309 } else {
310 Ok(())
311 }
312 }
313
314 /// Resume the track at the head of the queue.
315 pub fn resume(&self) -> TrackResult<()> {
316 let inner = self.inner.lock();
317
318 if let Some(handle) = inner.tracks.front() {
319 handle.play()
320 } else {
321 Ok(())
322 }
323 }
324
325 /// Stop the currently playing track, and clears the queue.
326 pub fn stop(&self) {
327 let mut inner = self.inner.lock();
328
329 for track in inner.tracks.drain(..) {
330 // Errors when removing tracks don't really make
331 // a difference: an error just implies it's already gone.
332 drop(track.stop());
333 }
334 }
335
336 /// Skip to the next track in the queue, if it exists.
337 pub fn skip(&self) -> TrackResult<()> {
338 let inner = self.inner.lock();
339
340 inner.stop_current()
341 }
342
343 /// Returns a list of currently queued tracks.
344 ///
345 /// Does not allow for modification of the queue, instead returns a snapshot of the queue at the time of calling.
346 ///
347 /// Use [`modify_queue`] for direct modification of the queue.
348 ///
349 /// [`modify_queue`]: TrackQueue::modify_queue
350 #[must_use]
351 pub fn current_queue(&self) -> Vec<TrackHandle> {
352 let inner = self.inner.lock();
353
354 inner.tracks.iter().map(Queued::handle).collect()
355 }
356}
357
358impl TrackQueueCore {
359 /// Skip to the next track in the queue, if it exists.
360 fn stop_current(&self) -> TrackResult<()> {
361 if let Some(handle) = self.tracks.front() {
362 handle.stop()
363 } else {
364 Ok(())
365 }
366 }
367}
368
369#[cfg(all(test, feature = "builtin-queue"))]
370mod tests {
371 use crate::{
372 driver::Driver,
373 input::{File, HttpRequest},
374 tracks::PlayMode,
375 Config,
376 };
377 use reqwest::Client;
378 use std::time::Duration;
379
380 #[tokio::test]
381 #[ntest::timeout(20_000)]
382 async fn next_track_plays_on_end() {
383 let (t_handle, config) = Config::test_cfg(true);
384 let mut driver = Driver::new(config.clone());
385
386 let file1 = File::new("resources/ting.wav");
387 let file2 = file1.clone();
388
389 let h1 = driver.enqueue_input(file1.into()).await;
390 let h2 = driver.enqueue_input(file2.into()).await;
391
392 // Get h1 in place, playing. Wait for IO to ready.
393 // Fast wait here since it's all local I/O, no network.
394 t_handle
395 .ready_track(&h1, Some(Duration::from_millis(1)))
396 .await;
397 t_handle
398 .ready_track(&h2, Some(Duration::from_millis(1)))
399 .await;
400
401 // playout
402 t_handle.tick(1);
403 t_handle.wait(1);
404
405 let h1a = h1.get_info();
406 let h2a = h2.get_info();
407
408 // allow get_info to fire for h2.
409 t_handle.tick(2);
410
411 // post-conditions:
412 // 1) track 1 is done & dropped (commands fail).
413 // 2) track 2 is playing.
414 assert!(h1a.await.is_err());
415 assert_eq!(h2a.await.unwrap().playing, PlayMode::Play);
416 }
417
418 #[tokio::test]
419 #[ntest::timeout(15_000)]
420 async fn next_track_plays_on_skip() {
421 let (t_handle, config) = Config::test_cfg(true);
422 let mut driver = Driver::new(config.clone());
423
424 let file1 = File::new("resources/ting.wav");
425 let file2 = file1.clone();
426
427 let h1 = driver.enqueue_input(file1.into()).await;
428 let h2 = driver.enqueue_input(file2.into()).await;
429
430 // Get h1 in place, playing. Wait for IO to ready.
431 // Fast wait here since it's all local I/O, no network.
432 t_handle
433 .ready_track(&h1, Some(Duration::from_millis(1)))
434 .await;
435
436 assert!(driver.queue().skip().is_ok());
437
438 t_handle
439 .ready_track(&h2, Some(Duration::from_millis(1)))
440 .await;
441
442 // playout
443 t_handle.skip(1).await;
444
445 let h1a = h1.get_info();
446 let h2a = h2.get_info();
447
448 // allow get_info to fire for h2.
449 t_handle.tick(2);
450
451 // post-conditions:
452 // 1) track 1 is done & dropped (commands fail).
453 // 2) track 2 is playing.
454 assert!(h1a.await.is_err());
455 assert_eq!(h2a.await.unwrap().playing, PlayMode::Play);
456 }
457
458 #[tokio::test]
459 #[ntest::timeout(15_000)]
460 async fn next_track_plays_on_err() {
461 let (t_handle, config) = Config::test_cfg(true);
462 let mut driver = Driver::new(config.clone());
463
464 // File 1 is HTML with no valid audio -- this will fail to play.
465 let file1 = HttpRequest::new(
466 Client::new(),
467 "http://github.com/serenity-rs/songbird/".into(),
468 );
469 let file2 = File::new("resources/ting.wav");
470
471 let h1 = driver.enqueue_input(file1.into()).await;
472 let h2 = driver.enqueue_input(file2.into()).await;
473
474 // Get h1 in place, playing. Wait for IO to ready.
475 // Fast wait here since it's all local I/O, no network.
476 // t_handle
477 // .ready_track(&h1, Some(Duration::from_millis(1)))
478 // .await;
479 t_handle
480 .ready_track(&h2, Some(Duration::from_millis(1)))
481 .await;
482
483 // playout
484 t_handle.tick(1);
485 t_handle.wait(1);
486
487 let h1a = h1.get_info();
488 let h2a = h2.get_info();
489
490 // allow get_info to fire for h2.
491 t_handle.tick(2);
492
493 // post-conditions:
494 // 1) track 1 is done & dropped (commands fail).
495 // 2) track 2 is playing.
496 assert!(h1a.await.is_err());
497 assert_eq!(h2a.await.unwrap().playing, PlayMode::Play);
498 }
499}