tokio/io/async_fd.rs
1use crate::io::{Interest, Ready};
2use crate::runtime::io::{ReadyEvent, Registration};
3use crate::runtime::scheduler;
4
5use mio::unix::SourceFd;
6use std::error::Error;
7use std::fmt;
8use std::io;
9use std::os::unix::io::{AsRawFd, RawFd};
10use std::task::{ready, Context, Poll};
11
12/// Associates an IO object backed by a Unix file descriptor with the tokio
13/// reactor, allowing for readiness to be polled. The file descriptor must be of
14/// a type that can be used with the OS polling facilities (ie, `poll`, `epoll`,
15/// `kqueue`, etc), such as a network socket or pipe, and the file descriptor
16/// must have the nonblocking mode set to true.
17///
18/// Creating an [`AsyncFd`] registers the file descriptor with the current tokio
19/// Reactor, allowing you to directly await the file descriptor being readable
20/// or writable. Once registered, the file descriptor remains registered until
21/// the [`AsyncFd`] is dropped.
22///
23/// The [`AsyncFd`] takes ownership of an arbitrary object to represent the IO
24/// object. It is intended that the inner object will handle closing the file
25/// descriptor when it is dropped, avoiding resource leaks and ensuring that the
26/// [`AsyncFd`] can clean up the registration before closing the file descriptor.
27/// The [`AsyncFd::into_inner`] function can be used to extract the inner object
28/// to retake control from the tokio IO reactor. The [`OwnedFd`] type is often
29/// used as the inner object, as it is the simplest type that closes the fd on
30/// drop.
31///
32/// The inner object is required to implement [`AsRawFd`]. This file descriptor
33/// must not change while [`AsyncFd`] owns the inner object, i.e. the
34/// [`AsRawFd::as_raw_fd`] method on the inner type must always return the same
35/// file descriptor when called multiple times. Failure to uphold this results
36/// in unspecified behavior in the IO driver, which may include breaking
37/// notifications for other sockets/etc.
38///
39/// Polling for readiness is done by calling the async functions [`readable`]
40/// and [`writable`]. These functions complete when the associated readiness
41/// condition is observed. Any number of tasks can query the same `AsyncFd` in
42/// parallel, on the same or different conditions.
43///
44/// On some platforms, the readiness detecting mechanism relies on
45/// edge-triggered notifications. This means that the OS will only notify Tokio
46/// when the file descriptor transitions from not-ready to ready. For this to
47/// work you should first try to read or write and only poll for readiness
48/// if that fails with an error of [`std::io::ErrorKind::WouldBlock`].
49///
50/// Tokio internally tracks when it has received a ready notification, and when
51/// readiness checking functions like [`readable`] and [`writable`] are called,
52/// if the readiness flag is set, these async functions will complete
53/// immediately. This however does mean that it is critical to ensure that this
54/// ready flag is cleared when (and only when) the file descriptor ceases to be
55/// ready. The [`AsyncFdReadyGuard`] returned from readiness checking functions
56/// serves this function; after calling a readiness-checking async function,
57/// you must use this [`AsyncFdReadyGuard`] to signal to tokio whether the file
58/// descriptor is no longer in a ready state.
59///
60/// ## Use with to a poll-based API
61///
62/// In some cases it may be desirable to use `AsyncFd` from APIs similar to
63/// [`TcpStream::poll_read_ready`]. The [`AsyncFd::poll_read_ready`] and
64/// [`AsyncFd::poll_write_ready`] functions are provided for this purpose.
65/// Because these functions don't create a future to hold their state, they have
66/// the limitation that only one task can wait on each direction (read or write)
67/// at a time.
68///
69/// # Examples
70///
71/// This example shows how to turn [`std::net::TcpStream`] asynchronous using
72/// `AsyncFd`. It implements the read/write operations both as an `async fn`
73/// and using the IO traits [`AsyncRead`] and [`AsyncWrite`].
74///
75/// ```no_run
76/// use std::io::{self, Read, Write};
77/// use std::net::TcpStream;
78/// use std::pin::Pin;
79/// use std::task::{ready, Context, Poll};
80/// use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
81/// use tokio::io::unix::AsyncFd;
82///
83/// pub struct AsyncTcpStream {
84/// inner: AsyncFd<TcpStream>,
85/// }
86///
87/// impl AsyncTcpStream {
88/// pub fn new(tcp: TcpStream) -> io::Result<Self> {
89/// tcp.set_nonblocking(true)?;
90/// Ok(Self {
91/// inner: AsyncFd::new(tcp)?,
92/// })
93/// }
94///
95/// pub async fn read(&self, out: &mut [u8]) -> io::Result<usize> {
96/// loop {
97/// let mut guard = self.inner.readable().await?;
98///
99/// match guard.try_io(|inner| inner.get_ref().read(out)) {
100/// Ok(result) => return result,
101/// Err(_would_block) => continue,
102/// }
103/// }
104/// }
105///
106/// pub async fn write(&self, buf: &[u8]) -> io::Result<usize> {
107/// loop {
108/// let mut guard = self.inner.writable().await?;
109///
110/// match guard.try_io(|inner| inner.get_ref().write(buf)) {
111/// Ok(result) => return result,
112/// Err(_would_block) => continue,
113/// }
114/// }
115/// }
116/// }
117///
118/// impl AsyncRead for AsyncTcpStream {
119/// fn poll_read(
120/// self: Pin<&mut Self>,
121/// cx: &mut Context<'_>,
122/// buf: &mut ReadBuf<'_>
123/// ) -> Poll<io::Result<()>> {
124/// loop {
125/// let mut guard = ready!(self.inner.poll_read_ready(cx))?;
126///
127/// let unfilled = buf.initialize_unfilled();
128/// match guard.try_io(|inner| inner.get_ref().read(unfilled)) {
129/// Ok(Ok(len)) => {
130/// buf.advance(len);
131/// return Poll::Ready(Ok(()));
132/// },
133/// Ok(Err(err)) => return Poll::Ready(Err(err)),
134/// Err(_would_block) => continue,
135/// }
136/// }
137/// }
138/// }
139///
140/// impl AsyncWrite for AsyncTcpStream {
141/// fn poll_write(
142/// self: Pin<&mut Self>,
143/// cx: &mut Context<'_>,
144/// buf: &[u8]
145/// ) -> Poll<io::Result<usize>> {
146/// loop {
147/// let mut guard = ready!(self.inner.poll_write_ready(cx))?;
148///
149/// match guard.try_io(|inner| inner.get_ref().write(buf)) {
150/// Ok(result) => return Poll::Ready(result),
151/// Err(_would_block) => continue,
152/// }
153/// }
154/// }
155///
156/// fn poll_flush(
157/// self: Pin<&mut Self>,
158/// cx: &mut Context<'_>,
159/// ) -> Poll<io::Result<()>> {
160/// // tcp flush is a no-op
161/// Poll::Ready(Ok(()))
162/// }
163///
164/// fn poll_shutdown(
165/// self: Pin<&mut Self>,
166/// cx: &mut Context<'_>,
167/// ) -> Poll<io::Result<()>> {
168/// self.inner.get_ref().shutdown(std::net::Shutdown::Write)?;
169/// Poll::Ready(Ok(()))
170/// }
171/// }
172/// ```
173///
174/// [`readable`]: method@Self::readable
175/// [`writable`]: method@Self::writable
176/// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard
177/// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream
178/// [`AsyncRead`]: trait@crate::io::AsyncRead
179/// [`AsyncWrite`]: trait@crate::io::AsyncWrite
180/// [`OwnedFd`]: struct@std::os::fd::OwnedFd
181pub struct AsyncFd<T: AsRawFd> {
182 registration: Registration,
183 // The inner value is always present. the Option is required for `drop` and `into_inner`.
184 // In all other methods `unwrap` is valid, and will never panic.
185 inner: Option<T>,
186}
187
188/// Represents an IO-ready event detected on a particular file descriptor that
189/// has not yet been acknowledged. This is a `must_use` structure to help ensure
190/// that you do not forget to explicitly clear (or not clear) the event.
191///
192/// This type exposes an immutable reference to the underlying IO object.
193#[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"]
194pub struct AsyncFdReadyGuard<'a, T: AsRawFd> {
195 async_fd: &'a AsyncFd<T>,
196 event: Option<ReadyEvent>,
197}
198
199/// Represents an IO-ready event detected on a particular file descriptor that
200/// has not yet been acknowledged. This is a `must_use` structure to help ensure
201/// that you do not forget to explicitly clear (or not clear) the event.
202///
203/// This type exposes a mutable reference to the underlying IO object.
204#[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"]
205pub struct AsyncFdReadyMutGuard<'a, T: AsRawFd> {
206 async_fd: &'a mut AsyncFd<T>,
207 event: Option<ReadyEvent>,
208}
209
210impl<T: AsRawFd> AsyncFd<T> {
211 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
212 /// implementing [`AsRawFd`]. The backing file descriptor is cached at the
213 /// time of creation.
214 ///
215 /// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more
216 /// control, use [`AsyncFd::with_interest`].
217 ///
218 /// This method must be called in the context of a tokio runtime.
219 ///
220 /// # Panics
221 ///
222 /// This function panics if there is no current reactor set, or if the `rt`
223 /// feature flag is not enabled.
224 #[inline]
225 #[track_caller]
226 pub fn new(inner: T) -> io::Result<Self>
227 where
228 T: AsRawFd,
229 {
230 Self::with_interest(inner, Interest::READABLE | Interest::WRITABLE)
231 }
232
233 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
234 /// implementing [`AsRawFd`], with a specific [`Interest`]. The backing
235 /// file descriptor is cached at the time of creation.
236 ///
237 /// # Panics
238 ///
239 /// This function panics if there is no current reactor set, or if the `rt`
240 /// feature flag is not enabled.
241 #[inline]
242 #[track_caller]
243 pub fn with_interest(inner: T, interest: Interest) -> io::Result<Self>
244 where
245 T: AsRawFd,
246 {
247 Self::new_with_handle_and_interest(inner, scheduler::Handle::current(), interest)
248 }
249
250 #[track_caller]
251 pub(crate) fn new_with_handle_and_interest(
252 inner: T,
253 handle: scheduler::Handle,
254 interest: Interest,
255 ) -> io::Result<Self> {
256 Self::try_new_with_handle_and_interest(inner, handle, interest).map_err(Into::into)
257 }
258
259 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
260 /// implementing [`AsRawFd`]. The backing file descriptor is cached at the
261 /// time of creation.
262 ///
263 /// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more
264 /// control, use [`AsyncFd::try_with_interest`].
265 ///
266 /// This method must be called in the context of a tokio runtime.
267 ///
268 /// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object
269 /// passed to this function.
270 ///
271 /// # Panics
272 ///
273 /// This function panics if there is no current reactor set, or if the `rt`
274 /// feature flag is not enabled.
275 #[inline]
276 #[track_caller]
277 pub fn try_new(inner: T) -> Result<Self, AsyncFdTryNewError<T>>
278 where
279 T: AsRawFd,
280 {
281 Self::try_with_interest(inner, Interest::READABLE | Interest::WRITABLE)
282 }
283
284 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
285 /// implementing [`AsRawFd`], with a specific [`Interest`]. The backing
286 /// file descriptor is cached at the time of creation.
287 ///
288 /// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object
289 /// passed to this function.
290 ///
291 /// # Panics
292 ///
293 /// This function panics if there is no current reactor set, or if the `rt`
294 /// feature flag is not enabled.
295 #[inline]
296 #[track_caller]
297 pub fn try_with_interest(inner: T, interest: Interest) -> Result<Self, AsyncFdTryNewError<T>>
298 where
299 T: AsRawFd,
300 {
301 Self::try_new_with_handle_and_interest(inner, scheduler::Handle::current(), interest)
302 }
303
304 #[track_caller]
305 pub(crate) fn try_new_with_handle_and_interest(
306 inner: T,
307 handle: scheduler::Handle,
308 interest: Interest,
309 ) -> Result<Self, AsyncFdTryNewError<T>> {
310 let fd = inner.as_raw_fd();
311
312 match Registration::new_with_interest_and_handle(&mut SourceFd(&fd), interest, handle) {
313 Ok(registration) => Ok(AsyncFd {
314 registration,
315 inner: Some(inner),
316 }),
317 Err(cause) => Err(AsyncFdTryNewError { inner, cause }),
318 }
319 }
320
321 /// Returns a shared reference to the backing object of this [`AsyncFd`].
322 #[inline]
323 pub fn get_ref(&self) -> &T {
324 self.inner.as_ref().unwrap()
325 }
326
327 /// Returns a mutable reference to the backing object of this [`AsyncFd`].
328 #[inline]
329 pub fn get_mut(&mut self) -> &mut T {
330 self.inner.as_mut().unwrap()
331 }
332
333 fn take_inner(&mut self) -> Option<T> {
334 let inner = self.inner.take()?;
335 let fd = inner.as_raw_fd();
336
337 let _ = self.registration.deregister(&mut SourceFd(&fd));
338
339 Some(inner)
340 }
341
342 /// Deregisters this file descriptor and returns ownership of the backing
343 /// object.
344 pub fn into_inner(mut self) -> T {
345 self.take_inner().unwrap()
346 }
347
348 /// Polls for read readiness.
349 ///
350 /// If the file descriptor is not currently ready for reading, this method
351 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
352 /// file descriptor becomes ready for reading, [`Waker::wake`] will be called.
353 ///
354 /// Note that on multiple calls to [`poll_read_ready`] or
355 /// [`poll_read_ready_mut`], only the `Waker` from the `Context` passed to the
356 /// most recent call is scheduled to receive a wakeup. (However,
357 /// [`poll_write_ready`] retains a second, independent waker).
358 ///
359 /// This method is intended for cases where creating and pinning a future
360 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
361 /// preferred, as this supports polling from multiple tasks at once.
362 ///
363 /// This method takes `&self`, so it is possible to call this method
364 /// concurrently with other methods on this struct. This method only
365 /// provides shared access to the inner IO resource when handling the
366 /// [`AsyncFdReadyGuard`].
367 ///
368 /// [`poll_read_ready`]: method@Self::poll_read_ready
369 /// [`poll_read_ready_mut`]: method@Self::poll_read_ready_mut
370 /// [`poll_write_ready`]: method@Self::poll_write_ready
371 /// [`readable`]: method@Self::readable
372 /// [`Context`]: struct@std::task::Context
373 /// [`Waker`]: struct@std::task::Waker
374 /// [`Waker::wake`]: method@std::task::Waker::wake
375 pub fn poll_read_ready<'a>(
376 &'a self,
377 cx: &mut Context<'_>,
378 ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
379 let event = ready!(self.registration.poll_read_ready(cx))?;
380
381 Poll::Ready(Ok(AsyncFdReadyGuard {
382 async_fd: self,
383 event: Some(event),
384 }))
385 }
386
387 /// Polls for read readiness.
388 ///
389 /// If the file descriptor is not currently ready for reading, this method
390 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
391 /// file descriptor becomes ready for reading, [`Waker::wake`] will be called.
392 ///
393 /// Note that on multiple calls to [`poll_read_ready`] or
394 /// [`poll_read_ready_mut`], only the `Waker` from the `Context` passed to the
395 /// most recent call is scheduled to receive a wakeup. (However,
396 /// [`poll_write_ready`] retains a second, independent waker).
397 ///
398 /// This method is intended for cases where creating and pinning a future
399 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
400 /// preferred, as this supports polling from multiple tasks at once.
401 ///
402 /// This method takes `&mut self`, so it is possible to access the inner IO
403 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
404 ///
405 /// [`poll_read_ready`]: method@Self::poll_read_ready
406 /// [`poll_read_ready_mut`]: method@Self::poll_read_ready_mut
407 /// [`poll_write_ready`]: method@Self::poll_write_ready
408 /// [`readable`]: method@Self::readable
409 /// [`Context`]: struct@std::task::Context
410 /// [`Waker`]: struct@std::task::Waker
411 /// [`Waker::wake`]: method@std::task::Waker::wake
412 pub fn poll_read_ready_mut<'a>(
413 &'a mut self,
414 cx: &mut Context<'_>,
415 ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>> {
416 let event = ready!(self.registration.poll_read_ready(cx))?;
417
418 Poll::Ready(Ok(AsyncFdReadyMutGuard {
419 async_fd: self,
420 event: Some(event),
421 }))
422 }
423
424 /// Polls for write readiness.
425 ///
426 /// If the file descriptor is not currently ready for writing, this method
427 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
428 /// file descriptor becomes ready for writing, [`Waker::wake`] will be called.
429 ///
430 /// Note that on multiple calls to [`poll_write_ready`] or
431 /// [`poll_write_ready_mut`], only the `Waker` from the `Context` passed to the
432 /// most recent call is scheduled to receive a wakeup. (However,
433 /// [`poll_read_ready`] retains a second, independent waker).
434 ///
435 /// This method is intended for cases where creating and pinning a future
436 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
437 /// preferred, as this supports polling from multiple tasks at once.
438 ///
439 /// This method takes `&self`, so it is possible to call this method
440 /// concurrently with other methods on this struct. This method only
441 /// provides shared access to the inner IO resource when handling the
442 /// [`AsyncFdReadyGuard`].
443 ///
444 /// [`poll_read_ready`]: method@Self::poll_read_ready
445 /// [`poll_write_ready`]: method@Self::poll_write_ready
446 /// [`poll_write_ready_mut`]: method@Self::poll_write_ready_mut
447 /// [`writable`]: method@Self::readable
448 /// [`Context`]: struct@std::task::Context
449 /// [`Waker`]: struct@std::task::Waker
450 /// [`Waker::wake`]: method@std::task::Waker::wake
451 pub fn poll_write_ready<'a>(
452 &'a self,
453 cx: &mut Context<'_>,
454 ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
455 let event = ready!(self.registration.poll_write_ready(cx))?;
456
457 Poll::Ready(Ok(AsyncFdReadyGuard {
458 async_fd: self,
459 event: Some(event),
460 }))
461 }
462
463 /// Polls for write readiness.
464 ///
465 /// If the file descriptor is not currently ready for writing, this method
466 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
467 /// file descriptor becomes ready for writing, [`Waker::wake`] will be called.
468 ///
469 /// Note that on multiple calls to [`poll_write_ready`] or
470 /// [`poll_write_ready_mut`], only the `Waker` from the `Context` passed to the
471 /// most recent call is scheduled to receive a wakeup. (However,
472 /// [`poll_read_ready`] retains a second, independent waker).
473 ///
474 /// This method is intended for cases where creating and pinning a future
475 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
476 /// preferred, as this supports polling from multiple tasks at once.
477 ///
478 /// This method takes `&mut self`, so it is possible to access the inner IO
479 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
480 ///
481 /// [`poll_read_ready`]: method@Self::poll_read_ready
482 /// [`poll_write_ready`]: method@Self::poll_write_ready
483 /// [`poll_write_ready_mut`]: method@Self::poll_write_ready_mut
484 /// [`writable`]: method@Self::readable
485 /// [`Context`]: struct@std::task::Context
486 /// [`Waker`]: struct@std::task::Waker
487 /// [`Waker::wake`]: method@std::task::Waker::wake
488 pub fn poll_write_ready_mut<'a>(
489 &'a mut self,
490 cx: &mut Context<'_>,
491 ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>> {
492 let event = ready!(self.registration.poll_write_ready(cx))?;
493
494 Poll::Ready(Ok(AsyncFdReadyMutGuard {
495 async_fd: self,
496 event: Some(event),
497 }))
498 }
499
500 /// Waits for any of the requested ready states, returning a
501 /// [`AsyncFdReadyGuard`] that must be dropped to resume
502 /// polling for the requested ready states.
503 ///
504 /// The function may complete without the file descriptor being ready. This is a
505 /// false-positive and attempting an operation will return with
506 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
507 /// [`Ready`] set, so you should always check the returned value and possibly
508 /// wait again if the requested states are not set.
509 ///
510 /// When an IO operation does return `io::ErrorKind::WouldBlock`, the readiness must be cleared.
511 /// When a combined interest is used, it is important to clear only the readiness
512 /// that is actually observed to block. For instance when the combined
513 /// interest `Interest::READABLE | Interest::WRITABLE` is used, and a read blocks, only
514 /// read readiness should be cleared using the [`AsyncFdReadyGuard::clear_ready_matching`] method:
515 /// `guard.clear_ready_matching(Ready::READABLE)`.
516 /// Also clearing the write readiness in this case would be incorrect. The [`AsyncFdReadyGuard::clear_ready`]
517 /// method clears all readiness flags.
518 ///
519 /// This method takes `&self`, so it is possible to call this method
520 /// concurrently with other methods on this struct. This method only
521 /// provides shared access to the inner IO resource when handling the
522 /// [`AsyncFdReadyGuard`].
523 ///
524 /// # Examples
525 ///
526 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
527 /// splitting.
528 ///
529 /// ```no_run
530 /// use std::error::Error;
531 /// use std::io;
532 /// use std::io::{Read, Write};
533 /// use std::net::TcpStream;
534 /// use tokio::io::unix::AsyncFd;
535 /// use tokio::io::{Interest, Ready};
536 ///
537 /// #[tokio::main]
538 /// async fn main() -> Result<(), Box<dyn Error>> {
539 /// let stream = TcpStream::connect("127.0.0.1:8080")?;
540 /// stream.set_nonblocking(true)?;
541 /// let stream = AsyncFd::new(stream)?;
542 ///
543 /// loop {
544 /// let mut guard = stream
545 /// .ready(Interest::READABLE | Interest::WRITABLE)
546 /// .await?;
547 ///
548 /// if guard.ready().is_readable() {
549 /// let mut data = vec![0; 1024];
550 /// // Try to read data, this may still fail with `WouldBlock`
551 /// // if the readiness event is a false positive.
552 /// match stream.get_ref().read(&mut data) {
553 /// Ok(n) => {
554 /// println!("read {} bytes", n);
555 /// }
556 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
557 /// // a read has blocked, but a write might still succeed.
558 /// // clear only the read readiness.
559 /// guard.clear_ready_matching(Ready::READABLE);
560 /// continue;
561 /// }
562 /// Err(e) => {
563 /// return Err(e.into());
564 /// }
565 /// }
566 /// }
567 ///
568 /// if guard.ready().is_writable() {
569 /// // Try to write data, this may still fail with `WouldBlock`
570 /// // if the readiness event is a false positive.
571 /// match stream.get_ref().write(b"hello world") {
572 /// Ok(n) => {
573 /// println!("write {} bytes", n);
574 /// }
575 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
576 /// // a write has blocked, but a read might still succeed.
577 /// // clear only the write readiness.
578 /// guard.clear_ready_matching(Ready::WRITABLE);
579 /// continue;
580 /// }
581 /// Err(e) => {
582 /// return Err(e.into());
583 /// }
584 /// }
585 /// }
586 /// }
587 /// }
588 /// ```
589 pub async fn ready(&self, interest: Interest) -> io::Result<AsyncFdReadyGuard<'_, T>> {
590 let event = self.registration.readiness(interest).await?;
591
592 Ok(AsyncFdReadyGuard {
593 async_fd: self,
594 event: Some(event),
595 })
596 }
597
598 /// Waits for any of the requested ready states, returning a
599 /// [`AsyncFdReadyMutGuard`] that must be dropped to resume
600 /// polling for the requested ready states.
601 ///
602 /// The function may complete without the file descriptor being ready. This is a
603 /// false-positive and attempting an operation will return with
604 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
605 /// [`Ready`] set, so you should always check the returned value and possibly
606 /// wait again if the requested states are not set.
607 ///
608 /// When an IO operation does return `io::ErrorKind::WouldBlock`, the readiness must be cleared.
609 /// When a combined interest is used, it is important to clear only the readiness
610 /// that is actually observed to block. For instance when the combined
611 /// interest `Interest::READABLE | Interest::WRITABLE` is used, and a read blocks, only
612 /// read readiness should be cleared using the [`AsyncFdReadyMutGuard::clear_ready_matching`] method:
613 /// `guard.clear_ready_matching(Ready::READABLE)`.
614 /// Also clearing the write readiness in this case would be incorrect.
615 /// The [`AsyncFdReadyMutGuard::clear_ready`] method clears all readiness flags.
616 ///
617 /// This method takes `&mut self`, so it is possible to access the inner IO
618 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
619 ///
620 /// # Examples
621 ///
622 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
623 /// splitting.
624 ///
625 /// ```no_run
626 /// use std::error::Error;
627 /// use std::io;
628 /// use std::io::{Read, Write};
629 /// use std::net::TcpStream;
630 /// use tokio::io::unix::AsyncFd;
631 /// use tokio::io::{Interest, Ready};
632 ///
633 /// #[tokio::main]
634 /// async fn main() -> Result<(), Box<dyn Error>> {
635 /// let stream = TcpStream::connect("127.0.0.1:8080")?;
636 /// stream.set_nonblocking(true)?;
637 /// let mut stream = AsyncFd::new(stream)?;
638 ///
639 /// loop {
640 /// let mut guard = stream
641 /// .ready_mut(Interest::READABLE | Interest::WRITABLE)
642 /// .await?;
643 ///
644 /// if guard.ready().is_readable() {
645 /// let mut data = vec![0; 1024];
646 /// // Try to read data, this may still fail with `WouldBlock`
647 /// // if the readiness event is a false positive.
648 /// match guard.get_inner_mut().read(&mut data) {
649 /// Ok(n) => {
650 /// println!("read {} bytes", n);
651 /// }
652 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
653 /// // a read has blocked, but a write might still succeed.
654 /// // clear only the read readiness.
655 /// guard.clear_ready_matching(Ready::READABLE);
656 /// continue;
657 /// }
658 /// Err(e) => {
659 /// return Err(e.into());
660 /// }
661 /// }
662 /// }
663 ///
664 /// if guard.ready().is_writable() {
665 /// // Try to write data, this may still fail with `WouldBlock`
666 /// // if the readiness event is a false positive.
667 /// match guard.get_inner_mut().write(b"hello world") {
668 /// Ok(n) => {
669 /// println!("write {} bytes", n);
670 /// }
671 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
672 /// // a write has blocked, but a read might still succeed.
673 /// // clear only the write readiness.
674 /// guard.clear_ready_matching(Ready::WRITABLE);
675 /// continue;
676 /// }
677 /// Err(e) => {
678 /// return Err(e.into());
679 /// }
680 /// }
681 /// }
682 /// }
683 /// }
684 /// ```
685 pub async fn ready_mut(
686 &mut self,
687 interest: Interest,
688 ) -> io::Result<AsyncFdReadyMutGuard<'_, T>> {
689 let event = self.registration.readiness(interest).await?;
690
691 Ok(AsyncFdReadyMutGuard {
692 async_fd: self,
693 event: Some(event),
694 })
695 }
696
697 /// Waits for the file descriptor to become readable, returning a
698 /// [`AsyncFdReadyGuard`] that must be dropped to resume read-readiness
699 /// polling.
700 ///
701 /// This method takes `&self`, so it is possible to call this method
702 /// concurrently with other methods on this struct. This method only
703 /// provides shared access to the inner IO resource when handling the
704 /// [`AsyncFdReadyGuard`].
705 ///
706 /// # Cancel safety
707 ///
708 /// This method is cancel safe. Once a readiness event occurs, the method
709 /// will continue to return immediately until the readiness event is
710 /// consumed by an attempt to read or write that fails with `WouldBlock` or
711 /// `Poll::Pending`.
712 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
713 pub async fn readable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>> {
714 self.ready(Interest::READABLE).await
715 }
716
717 /// Waits for the file descriptor to become readable, returning a
718 /// [`AsyncFdReadyMutGuard`] that must be dropped to resume read-readiness
719 /// polling.
720 ///
721 /// This method takes `&mut self`, so it is possible to access the inner IO
722 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
723 ///
724 /// # Cancel safety
725 ///
726 /// This method is cancel safe. Once a readiness event occurs, the method
727 /// will continue to return immediately until the readiness event is
728 /// consumed by an attempt to read or write that fails with `WouldBlock` or
729 /// `Poll::Pending`.
730 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
731 pub async fn readable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> {
732 self.ready_mut(Interest::READABLE).await
733 }
734
735 /// Waits for the file descriptor to become writable, returning a
736 /// [`AsyncFdReadyGuard`] that must be dropped to resume write-readiness
737 /// polling.
738 ///
739 /// This method takes `&self`, so it is possible to call this method
740 /// concurrently with other methods on this struct. This method only
741 /// provides shared access to the inner IO resource when handling the
742 /// [`AsyncFdReadyGuard`].
743 ///
744 /// # Cancel safety
745 ///
746 /// This method is cancel safe. Once a readiness event occurs, the method
747 /// will continue to return immediately until the readiness event is
748 /// consumed by an attempt to read or write that fails with `WouldBlock` or
749 /// `Poll::Pending`.
750 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
751 pub async fn writable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>> {
752 self.ready(Interest::WRITABLE).await
753 }
754
755 /// Waits for the file descriptor to become writable, returning a
756 /// [`AsyncFdReadyMutGuard`] that must be dropped to resume write-readiness
757 /// polling.
758 ///
759 /// This method takes `&mut self`, so it is possible to access the inner IO
760 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
761 ///
762 /// # Cancel safety
763 ///
764 /// This method is cancel safe. Once a readiness event occurs, the method
765 /// will continue to return immediately until the readiness event is
766 /// consumed by an attempt to read or write that fails with `WouldBlock` or
767 /// `Poll::Pending`.
768 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
769 pub async fn writable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> {
770 self.ready_mut(Interest::WRITABLE).await
771 }
772
773 /// Reads or writes from the file descriptor using a user-provided IO operation.
774 ///
775 /// The `async_io` method is a convenience utility that waits for the file
776 /// descriptor to become ready, and then executes the provided IO operation.
777 /// Since file descriptors may be marked ready spuriously, the closure will
778 /// be called repeatedly until it returns something other than a
779 /// [`WouldBlock`] error. This is done using the following loop:
780 ///
781 /// ```no_run
782 /// # use std::io::{self, Result};
783 /// # struct Dox<T> { inner: T }
784 /// # impl<T> Dox<T> {
785 /// # async fn writable(&self) -> Result<&Self> {
786 /// # Ok(self)
787 /// # }
788 /// # fn try_io<R>(&self, _: impl FnMut(&T) -> Result<R>) -> Result<Result<R>> {
789 /// # panic!()
790 /// # }
791 /// async fn async_io<R>(&self, mut f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
792 /// loop {
793 /// // or `readable` if called with the read interest.
794 /// let guard = self.writable().await?;
795 ///
796 /// match guard.try_io(&mut f) {
797 /// Ok(result) => return result,
798 /// Err(_would_block) => continue,
799 /// }
800 /// }
801 /// }
802 /// # }
803 /// ```
804 ///
805 /// The closure should only return a [`WouldBlock`] error if it has performed
806 /// an IO operation on the file descriptor that failed due to the file descriptor not being
807 /// ready. Returning a [`WouldBlock`] error in any other situation will
808 /// incorrectly clear the readiness flag, which can cause the file descriptor to
809 /// behave incorrectly.
810 ///
811 /// The closure should not perform the IO operation using any of the methods
812 /// defined on the Tokio [`AsyncFd`] type, as this will mess with the
813 /// readiness flag and can cause the file descriptor to behave incorrectly.
814 ///
815 /// This method is not intended to be used with combined interests.
816 /// The closure should perform only one type of IO operation, so it should not
817 /// require more than one ready state. This method may panic or sleep forever
818 /// if it is called with a combined interest.
819 ///
820 /// # Examples
821 ///
822 /// This example sends some bytes on the inner [`std::net::UdpSocket`]. The `async_io`
823 /// method waits for readiness, and retries if the send operation does block. This example
824 /// is equivalent to the one given for [`try_io`].
825 ///
826 /// ```no_run
827 /// use tokio::io::{Interest, unix::AsyncFd};
828 ///
829 /// use std::io;
830 /// use std::net::UdpSocket;
831 ///
832 /// #[tokio::main]
833 /// async fn main() -> io::Result<()> {
834 /// let socket = UdpSocket::bind("0.0.0.0:8080")?;
835 /// socket.set_nonblocking(true)?;
836 /// let async_fd = AsyncFd::new(socket)?;
837 ///
838 /// let written = async_fd
839 /// .async_io(Interest::WRITABLE, |inner| inner.send(&[1, 2]))
840 /// .await?;
841 ///
842 /// println!("wrote {written} bytes");
843 ///
844 /// Ok(())
845 /// }
846 /// ```
847 ///
848 /// [`try_io`]: AsyncFdReadyGuard::try_io
849 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
850 pub async fn async_io<R>(
851 &self,
852 interest: Interest,
853 mut f: impl FnMut(&T) -> io::Result<R>,
854 ) -> io::Result<R> {
855 self.registration
856 .async_io(interest, || f(self.get_ref()))
857 .await
858 }
859
860 /// Reads or writes from the file descriptor using a user-provided IO operation.
861 ///
862 /// The behavior is the same as [`async_io`], except that the closure can mutate the inner
863 /// value of the [`AsyncFd`].
864 ///
865 /// [`async_io`]: AsyncFd::async_io
866 pub async fn async_io_mut<R>(
867 &mut self,
868 interest: Interest,
869 mut f: impl FnMut(&mut T) -> io::Result<R>,
870 ) -> io::Result<R> {
871 self.registration
872 .async_io(interest, || f(self.inner.as_mut().unwrap()))
873 .await
874 }
875}
876
877impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
878 fn as_raw_fd(&self) -> RawFd {
879 self.inner.as_ref().unwrap().as_raw_fd()
880 }
881}
882
883impl<T: AsRawFd> std::os::unix::io::AsFd for AsyncFd<T> {
884 fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
885 unsafe { std::os::unix::io::BorrowedFd::borrow_raw(self.as_raw_fd()) }
886 }
887}
888
889impl<T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFd<T> {
890 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
891 f.debug_struct("AsyncFd")
892 .field("inner", &self.inner)
893 .finish()
894 }
895}
896
897impl<T: AsRawFd> Drop for AsyncFd<T> {
898 fn drop(&mut self) {
899 let _ = self.take_inner();
900 }
901}
902
903impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
904 /// Indicates to tokio that the file descriptor is no longer ready. All
905 /// internal readiness flags will be cleared, and tokio will wait for the
906 /// next edge-triggered readiness notification from the OS.
907 ///
908 /// This function is commonly used with guards returned by [`AsyncFd::readable`] and
909 /// [`AsyncFd::writable`].
910 ///
911 /// It is critical that this function not be called unless your code
912 /// _actually observes_ that the file descriptor is _not_ ready. Do not call
913 /// it simply because, for example, a read succeeded; it should be called
914 /// when a read is observed to block.
915 ///
916 /// This method only clears readiness events that happened before the creation of this guard.
917 /// In other words, if the IO resource becomes ready between the creation of the guard and
918 /// this call to `clear_ready`, then the readiness is not actually cleared.
919 pub fn clear_ready(&mut self) {
920 if let Some(event) = self.event.take() {
921 self.async_fd.registration.clear_readiness(event);
922 }
923 }
924
925 /// Indicates to tokio that the file descriptor no longer has a specific readiness.
926 /// The internal readiness flag will be cleared, and tokio will wait for the
927 /// next edge-triggered readiness notification from the OS.
928 ///
929 /// This function is useful in combination with the [`AsyncFd::ready`] method when a
930 /// combined interest like `Interest::READABLE | Interest::WRITABLE` is used.
931 ///
932 /// It is critical that this function not be called unless your code
933 /// _actually observes_ that the file descriptor is _not_ ready for the provided `Ready`.
934 /// Do not call it simply because, for example, a read succeeded; it should be called
935 /// when a read is observed to block. Only clear the specific readiness that is observed to
936 /// block. For example when a read blocks when using a combined interest,
937 /// only clear `Ready::READABLE`.
938 ///
939 /// This method only clears readiness events that happened before the creation of this guard.
940 /// In other words, if the IO resource becomes ready between the creation of the guard and
941 /// this call to `clear_ready`, then the readiness is not actually cleared.
942 ///
943 /// # Examples
944 ///
945 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
946 /// splitting.
947 ///
948 /// ```no_run
949 /// use std::error::Error;
950 /// use std::io;
951 /// use std::io::{Read, Write};
952 /// use std::net::TcpStream;
953 /// use tokio::io::unix::AsyncFd;
954 /// use tokio::io::{Interest, Ready};
955 ///
956 /// #[tokio::main]
957 /// async fn main() -> Result<(), Box<dyn Error>> {
958 /// let stream = TcpStream::connect("127.0.0.1:8080")?;
959 /// stream.set_nonblocking(true)?;
960 /// let stream = AsyncFd::new(stream)?;
961 ///
962 /// loop {
963 /// let mut guard = stream
964 /// .ready(Interest::READABLE | Interest::WRITABLE)
965 /// .await?;
966 ///
967 /// if guard.ready().is_readable() {
968 /// let mut data = vec![0; 1024];
969 /// // Try to read data, this may still fail with `WouldBlock`
970 /// // if the readiness event is a false positive.
971 /// match stream.get_ref().read(&mut data) {
972 /// Ok(n) => {
973 /// println!("read {} bytes", n);
974 /// }
975 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
976 /// // a read has blocked, but a write might still succeed.
977 /// // clear only the read readiness.
978 /// guard.clear_ready_matching(Ready::READABLE);
979 /// continue;
980 /// }
981 /// Err(e) => {
982 /// return Err(e.into());
983 /// }
984 /// }
985 /// }
986 ///
987 /// if guard.ready().is_writable() {
988 /// // Try to write data, this may still fail with `WouldBlock`
989 /// // if the readiness event is a false positive.
990 /// match stream.get_ref().write(b"hello world") {
991 /// Ok(n) => {
992 /// println!("write {} bytes", n);
993 /// }
994 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
995 /// // a write has blocked, but a read might still succeed.
996 /// // clear only the write readiness.
997 /// guard.clear_ready_matching(Ready::WRITABLE);
998 /// continue;
999 /// }
1000 /// Err(e) => {
1001 /// return Err(e.into());
1002 /// }
1003 /// }
1004 /// }
1005 /// }
1006 /// }
1007 /// ```
1008 pub fn clear_ready_matching(&mut self, ready: Ready) {
1009 if let Some(mut event) = self.event.take() {
1010 self.async_fd
1011 .registration
1012 .clear_readiness(event.with_ready(ready));
1013
1014 // the event is no longer ready for the readiness that was just cleared
1015 event.ready = event.ready - ready;
1016
1017 if !event.ready.is_empty() {
1018 self.event = Some(event);
1019 }
1020 }
1021 }
1022
1023 /// This method should be invoked when you intentionally want to keep the
1024 /// ready flag asserted.
1025 ///
1026 /// While this function is itself a no-op, it satisfies the `#[must_use]`
1027 /// constraint on the [`AsyncFdReadyGuard`] type.
1028 pub fn retain_ready(&mut self) {
1029 // no-op
1030 }
1031
1032 /// Get the [`Ready`] value associated with this guard.
1033 ///
1034 /// This method will return the empty readiness state if
1035 /// [`AsyncFdReadyGuard::clear_ready`] has been called on
1036 /// the guard.
1037 ///
1038 /// [`Ready`]: crate::io::Ready
1039 pub fn ready(&self) -> Ready {
1040 match &self.event {
1041 Some(event) => event.ready,
1042 None => Ready::EMPTY,
1043 }
1044 }
1045
1046 /// Performs the provided IO operation.
1047 ///
1048 /// If `f` returns a [`WouldBlock`] error, the readiness state associated
1049 /// with this file descriptor is cleared, and the method returns
1050 /// `Err(TryIoError::WouldBlock)`. You will typically need to poll the
1051 /// `AsyncFd` again when this happens.
1052 ///
1053 /// This method helps ensure that the readiness state of the underlying file
1054 /// descriptor remains in sync with the tokio-side readiness state, by
1055 /// clearing the tokio-side state only when a [`WouldBlock`] condition
1056 /// occurs. It is the responsibility of the caller to ensure that `f`
1057 /// returns [`WouldBlock`] only if the file descriptor that originated this
1058 /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
1059 /// create this `AsyncFdReadyGuard`.
1060 ///
1061 /// # Examples
1062 ///
1063 /// This example sends some bytes to the inner [`std::net::UdpSocket`]. Waiting
1064 /// for write-readiness and retrying when the send operation does block are explicit.
1065 /// This example can be written more succinctly using [`AsyncFd::async_io`].
1066 ///
1067 /// ```no_run
1068 /// use tokio::io::unix::AsyncFd;
1069 ///
1070 /// use std::io;
1071 /// use std::net::UdpSocket;
1072 ///
1073 /// #[tokio::main]
1074 /// async fn main() -> io::Result<()> {
1075 /// let socket = UdpSocket::bind("0.0.0.0:8080")?;
1076 /// socket.set_nonblocking(true)?;
1077 /// let async_fd = AsyncFd::new(socket)?;
1078 ///
1079 /// let written = loop {
1080 /// let mut guard = async_fd.writable().await?;
1081 /// match guard.try_io(|inner| inner.get_ref().send(&[1, 2])) {
1082 /// Ok(result) => {
1083 /// break result?;
1084 /// }
1085 /// Err(_would_block) => {
1086 /// // try_io already cleared the file descriptor's readiness state
1087 /// continue;
1088 /// }
1089 /// }
1090 /// };
1091 ///
1092 /// println!("wrote {written} bytes");
1093 ///
1094 /// Ok(())
1095 /// }
1096 /// ```
1097 ///
1098 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1099 // Alias for old name in 0.x
1100 #[cfg_attr(docsrs, doc(alias = "with_io"))]
1101 pub fn try_io<R>(
1102 &mut self,
1103 f: impl FnOnce(&'a AsyncFd<Inner>) -> io::Result<R>,
1104 ) -> Result<io::Result<R>, TryIoError> {
1105 let result = f(self.async_fd);
1106
1107 match result {
1108 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1109 self.clear_ready();
1110 Err(TryIoError(()))
1111 }
1112 result => Ok(result),
1113 }
1114 }
1115
1116 /// Returns a shared reference to the inner [`AsyncFd`].
1117 pub fn get_ref(&self) -> &'a AsyncFd<Inner> {
1118 self.async_fd
1119 }
1120
1121 /// Returns a shared reference to the backing object of the inner [`AsyncFd`].
1122 pub fn get_inner(&self) -> &'a Inner {
1123 self.get_ref().get_ref()
1124 }
1125}
1126
1127impl<'a, Inner: AsRawFd> AsyncFdReadyMutGuard<'a, Inner> {
1128 /// Indicates to tokio that the file descriptor is no longer ready. All
1129 /// internal readiness flags will be cleared, and tokio will wait for the
1130 /// next edge-triggered readiness notification from the OS.
1131 ///
1132 /// This function is commonly used with guards returned by [`AsyncFd::readable_mut`] and
1133 /// [`AsyncFd::writable_mut`].
1134 ///
1135 /// It is critical that this function not be called unless your code
1136 /// _actually observes_ that the file descriptor is _not_ ready. Do not call
1137 /// it simply because, for example, a read succeeded; it should be called
1138 /// when a read is observed to block.
1139 ///
1140 /// This method only clears readiness events that happened before the creation of this guard.
1141 /// In other words, if the IO resource becomes ready between the creation of the guard and
1142 /// this call to `clear_ready`, then the readiness is not actually cleared.
1143 pub fn clear_ready(&mut self) {
1144 if let Some(event) = self.event.take() {
1145 self.async_fd.registration.clear_readiness(event);
1146 }
1147 }
1148
1149 /// Indicates to tokio that the file descriptor no longer has a specific readiness.
1150 /// The internal readiness flag will be cleared, and tokio will wait for the
1151 /// next edge-triggered readiness notification from the OS.
1152 ///
1153 /// This function is useful in combination with the [`AsyncFd::ready_mut`] method when a
1154 /// combined interest like `Interest::READABLE | Interest::WRITABLE` is used.
1155 ///
1156 /// It is critical that this function not be called unless your code
1157 /// _actually observes_ that the file descriptor is _not_ ready for the provided `Ready`.
1158 /// Do not call it simply because, for example, a read succeeded; it should be called
1159 /// when a read is observed to block. Only clear the specific readiness that is observed to
1160 /// block. For example when a read blocks when using a combined interest,
1161 /// only clear `Ready::READABLE`.
1162 ///
1163 /// This method only clears readiness events that happened before the creation of this guard.
1164 /// In other words, if the IO resource becomes ready between the creation of the guard and
1165 /// this call to `clear_ready`, then the readiness is not actually cleared.
1166 ///
1167 /// # Examples
1168 ///
1169 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
1170 /// splitting.
1171 ///
1172 /// ```no_run
1173 /// use std::error::Error;
1174 /// use std::io;
1175 /// use std::io::{Read, Write};
1176 /// use std::net::TcpStream;
1177 /// use tokio::io::unix::AsyncFd;
1178 /// use tokio::io::{Interest, Ready};
1179 ///
1180 /// #[tokio::main]
1181 /// async fn main() -> Result<(), Box<dyn Error>> {
1182 /// let stream = TcpStream::connect("127.0.0.1:8080")?;
1183 /// stream.set_nonblocking(true)?;
1184 /// let mut stream = AsyncFd::new(stream)?;
1185 ///
1186 /// loop {
1187 /// let mut guard = stream
1188 /// .ready_mut(Interest::READABLE | Interest::WRITABLE)
1189 /// .await?;
1190 ///
1191 /// if guard.ready().is_readable() {
1192 /// let mut data = vec![0; 1024];
1193 /// // Try to read data, this may still fail with `WouldBlock`
1194 /// // if the readiness event is a false positive.
1195 /// match guard.get_inner_mut().read(&mut data) {
1196 /// Ok(n) => {
1197 /// println!("read {} bytes", n);
1198 /// }
1199 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1200 /// // a read has blocked, but a write might still succeed.
1201 /// // clear only the read readiness.
1202 /// guard.clear_ready_matching(Ready::READABLE);
1203 /// continue;
1204 /// }
1205 /// Err(e) => {
1206 /// return Err(e.into());
1207 /// }
1208 /// }
1209 /// }
1210 ///
1211 /// if guard.ready().is_writable() {
1212 /// // Try to write data, this may still fail with `WouldBlock`
1213 /// // if the readiness event is a false positive.
1214 /// match guard.get_inner_mut().write(b"hello world") {
1215 /// Ok(n) => {
1216 /// println!("write {} bytes", n);
1217 /// }
1218 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1219 /// // a write has blocked, but a read might still succeed.
1220 /// // clear only the write readiness.
1221 /// guard.clear_ready_matching(Ready::WRITABLE);
1222 /// continue;
1223 /// }
1224 /// Err(e) => {
1225 /// return Err(e.into());
1226 /// }
1227 /// }
1228 /// }
1229 /// }
1230 /// }
1231 /// ```
1232 pub fn clear_ready_matching(&mut self, ready: Ready) {
1233 if let Some(mut event) = self.event.take() {
1234 self.async_fd
1235 .registration
1236 .clear_readiness(event.with_ready(ready));
1237
1238 // the event is no longer ready for the readiness that was just cleared
1239 event.ready = event.ready - ready;
1240
1241 if !event.ready.is_empty() {
1242 self.event = Some(event);
1243 }
1244 }
1245 }
1246
1247 /// This method should be invoked when you intentionally want to keep the
1248 /// ready flag asserted.
1249 ///
1250 /// While this function is itself a no-op, it satisfies the `#[must_use]`
1251 /// constraint on the [`AsyncFdReadyGuard`] type.
1252 pub fn retain_ready(&mut self) {
1253 // no-op
1254 }
1255
1256 /// Get the [`Ready`] value associated with this guard.
1257 ///
1258 /// This method will return the empty readiness state if
1259 /// [`AsyncFdReadyGuard::clear_ready`] has been called on
1260 /// the guard.
1261 ///
1262 /// [`Ready`]: super::Ready
1263 pub fn ready(&self) -> Ready {
1264 match &self.event {
1265 Some(event) => event.ready,
1266 None => Ready::EMPTY,
1267 }
1268 }
1269
1270 /// Performs the provided IO operation.
1271 ///
1272 /// If `f` returns a [`WouldBlock`] error, the readiness state associated
1273 /// with this file descriptor is cleared, and the method returns
1274 /// `Err(TryIoError::WouldBlock)`. You will typically need to poll the
1275 /// `AsyncFd` again when this happens.
1276 ///
1277 /// This method helps ensure that the readiness state of the underlying file
1278 /// descriptor remains in sync with the tokio-side readiness state, by
1279 /// clearing the tokio-side state only when a [`WouldBlock`] condition
1280 /// occurs. It is the responsibility of the caller to ensure that `f`
1281 /// returns [`WouldBlock`] only if the file descriptor that originated this
1282 /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
1283 /// create this `AsyncFdReadyGuard`.
1284 ///
1285 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1286 pub fn try_io<R>(
1287 &mut self,
1288 f: impl FnOnce(&mut AsyncFd<Inner>) -> io::Result<R>,
1289 ) -> Result<io::Result<R>, TryIoError> {
1290 let result = f(self.async_fd);
1291
1292 match result {
1293 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1294 self.clear_ready();
1295 Err(TryIoError(()))
1296 }
1297 result => Ok(result),
1298 }
1299 }
1300
1301 /// Returns a shared reference to the inner [`AsyncFd`].
1302 pub fn get_ref(&self) -> &AsyncFd<Inner> {
1303 self.async_fd
1304 }
1305
1306 /// Returns a mutable reference to the inner [`AsyncFd`].
1307 pub fn get_mut(&mut self) -> &mut AsyncFd<Inner> {
1308 self.async_fd
1309 }
1310
1311 /// Returns a shared reference to the backing object of the inner [`AsyncFd`].
1312 pub fn get_inner(&self) -> &Inner {
1313 self.get_ref().get_ref()
1314 }
1315
1316 /// Returns a mutable reference to the backing object of the inner [`AsyncFd`].
1317 pub fn get_inner_mut(&mut self) -> &mut Inner {
1318 self.get_mut().get_mut()
1319 }
1320}
1321
1322impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> {
1323 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1324 f.debug_struct("ReadyGuard")
1325 .field("async_fd", &self.async_fd)
1326 .finish()
1327 }
1328}
1329
1330impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyMutGuard<'a, T> {
1331 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1332 f.debug_struct("MutReadyGuard")
1333 .field("async_fd", &self.async_fd)
1334 .finish()
1335 }
1336}
1337
1338/// The error type returned by [`try_io`].
1339///
1340/// This error indicates that the IO resource returned a [`WouldBlock`] error.
1341///
1342/// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1343/// [`try_io`]: method@AsyncFdReadyGuard::try_io
1344#[derive(Debug)]
1345pub struct TryIoError(());
1346
1347/// Error returned by [`try_new`] or [`try_with_interest`].
1348///
1349/// [`try_new`]: AsyncFd::try_new
1350/// [`try_with_interest`]: AsyncFd::try_with_interest
1351pub struct AsyncFdTryNewError<T> {
1352 inner: T,
1353 cause: io::Error,
1354}
1355
1356impl<T> AsyncFdTryNewError<T> {
1357 /// Returns the original object passed to [`try_new`] or [`try_with_interest`]
1358 /// alongside the error that caused these functions to fail.
1359 ///
1360 /// [`try_new`]: AsyncFd::try_new
1361 /// [`try_with_interest`]: AsyncFd::try_with_interest
1362 pub fn into_parts(self) -> (T, io::Error) {
1363 (self.inner, self.cause)
1364 }
1365}
1366
1367impl<T> fmt::Display for AsyncFdTryNewError<T> {
1368 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1369 fmt::Display::fmt(&self.cause, f)
1370 }
1371}
1372
1373impl<T> fmt::Debug for AsyncFdTryNewError<T> {
1374 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1375 fmt::Debug::fmt(&self.cause, f)
1376 }
1377}
1378
1379impl<T> Error for AsyncFdTryNewError<T> {
1380 fn source(&self) -> Option<&(dyn Error + 'static)> {
1381 Some(&self.cause)
1382 }
1383}
1384
1385impl<T> From<AsyncFdTryNewError<T>> for io::Error {
1386 fn from(value: AsyncFdTryNewError<T>) -> Self {
1387 value.cause
1388 }
1389}