tokio/io/util/
read_exact.rs

1use crate::io::{AsyncRead, ReadBuf};
2
3use pin_project_lite::pin_project;
4use std::future::Future;
5use std::io;
6use std::marker::PhantomPinned;
7use std::marker::Unpin;
8use std::pin::Pin;
9use std::task::{ready, Context, Poll};
10
11/// A future which can be used to easily read exactly enough bytes to fill
12/// a buffer.
13///
14/// Created by the [`AsyncReadExt::read_exact`][read_exact].
15/// [`read_exact`]: [`crate::io::AsyncReadExt::read_exact`]
16pub(crate) fn read_exact<'a, A>(reader: &'a mut A, buf: &'a mut [u8]) -> ReadExact<'a, A>
17where
18    A: AsyncRead + Unpin + ?Sized,
19{
20    ReadExact {
21        reader,
22        buf: ReadBuf::new(buf),
23        _pin: PhantomPinned,
24    }
25}
26
27pin_project! {
28    /// Creates a future which will read exactly enough bytes to fill `buf`,
29    /// returning an error if EOF is hit sooner.
30    ///
31    /// On success the number of bytes is returned
32    #[derive(Debug)]
33    #[must_use = "futures do nothing unless you `.await` or poll them"]
34    pub struct ReadExact<'a, A: ?Sized> {
35        reader: &'a mut A,
36        buf: ReadBuf<'a>,
37        // Make this future `!Unpin` for compatibility with async trait methods.
38        #[pin]
39        _pin: PhantomPinned,
40    }
41}
42
43fn eof() -> io::Error {
44    io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")
45}
46
47impl<A> Future for ReadExact<'_, A>
48where
49    A: AsyncRead + Unpin + ?Sized,
50{
51    type Output = io::Result<usize>;
52
53    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
54        let me = self.project();
55
56        loop {
57            // if our buffer is empty, then we need to read some data to continue.
58            let rem = me.buf.remaining();
59            if rem != 0 {
60                ready!(Pin::new(&mut *me.reader).poll_read(cx, me.buf))?;
61                if me.buf.remaining() == rem {
62                    return Err(eof()).into();
63                }
64            } else {
65                return Poll::Ready(Ok(me.buf.capacity()));
66            }
67        }
68    }
69}