1use crate::io::util::{BufReader, BufWriter};
2use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
34use pin_project_lite::pin_project;
5use std::io::{self, IoSlice, SeekFrom};
6use std::pin::Pin;
7use std::task::{Context, Poll};
89pin_project! {
10/// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output.
11 ///
12 /// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`]
13 /// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall
14 /// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`]
15 /// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps
16 /// one in the other so that both directions are buffered. See their documentation for details.
17#[derive(Debug)]
18 #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
19pub struct BufStream<RW> {
20#[pin]
21inner: BufReader<BufWriter<RW>>,
22 }
23}
2425impl<RW: AsyncRead + AsyncWrite> BufStream<RW> {
26/// Wraps a type in both [`BufWriter`] and [`BufReader`].
27 ///
28 /// See the documentation for those types and [`BufStream`] for details.
29pub fn new(stream: RW) -> BufStream<RW> {
30 BufStream {
31 inner: BufReader::new(BufWriter::new(stream)),
32 }
33 }
3435/// Creates a `BufStream` with the specified [`BufReader`] capacity and [`BufWriter`]
36 /// capacity.
37 ///
38 /// See the documentation for those types and [`BufStream`] for details.
39pub fn with_capacity(
40 reader_capacity: usize,
41 writer_capacity: usize,
42 stream: RW,
43 ) -> BufStream<RW> {
44 BufStream {
45 inner: BufReader::with_capacity(
46 reader_capacity,
47 BufWriter::with_capacity(writer_capacity, stream),
48 ),
49 }
50 }
5152/// Gets a reference to the underlying I/O object.
53 ///
54 /// It is inadvisable to directly read from the underlying I/O object.
55pub fn get_ref(&self) -> &RW {
56self.inner.get_ref().get_ref()
57 }
5859/// Gets a mutable reference to the underlying I/O object.
60 ///
61 /// It is inadvisable to directly read from the underlying I/O object.
62pub fn get_mut(&mut self) -> &mut RW {
63self.inner.get_mut().get_mut()
64 }
6566/// Gets a pinned mutable reference to the underlying I/O object.
67 ///
68 /// It is inadvisable to directly read from the underlying I/O object.
69pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> {
70self.project().inner.get_pin_mut().get_pin_mut()
71 }
7273/// Consumes this `BufStream`, returning the underlying I/O object.
74 ///
75 /// Note that any leftover data in the internal buffer is lost.
76pub fn into_inner(self) -> RW {
77self.inner.into_inner().into_inner()
78 }
79}
8081impl<RW> From<BufReader<BufWriter<RW>>> for BufStream<RW> {
82fn from(b: BufReader<BufWriter<RW>>) -> Self {
83 BufStream { inner: b }
84 }
85}
8687impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
88fn from(b: BufWriter<BufReader<RW>>) -> Self {
89// we need to "invert" the reader and writer
90let BufWriter {
91 inner:
92 BufReader {
93 inner,
94 buf: rbuf,
95 pos,
96 cap,
97 seek_state: rseek_state,
98 },
99 buf: wbuf,
100 written,
101 seek_state: wseek_state,
102 } = b;
103104 BufStream {
105 inner: BufReader {
106 inner: BufWriter {
107 inner,
108 buf: wbuf,
109 written,
110 seek_state: wseek_state,
111 },
112 buf: rbuf,
113 pos,
114 cap,
115 seek_state: rseek_state,
116 },
117 }
118 }
119}
120121impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> {
122fn poll_write(
123self: Pin<&mut Self>,
124 cx: &mut Context<'_>,
125 buf: &[u8],
126 ) -> Poll<io::Result<usize>> {
127self.project().inner.poll_write(cx, buf)
128 }
129130fn poll_write_vectored(
131self: Pin<&mut Self>,
132 cx: &mut Context<'_>,
133 bufs: &[IoSlice<'_>],
134 ) -> Poll<io::Result<usize>> {
135self.project().inner.poll_write_vectored(cx, bufs)
136 }
137138fn is_write_vectored(&self) -> bool {
139self.inner.is_write_vectored()
140 }
141142fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
143self.project().inner.poll_flush(cx)
144 }
145146fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
147self.project().inner.poll_shutdown(cx)
148 }
149}
150151impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> {
152fn poll_read(
153self: Pin<&mut Self>,
154 cx: &mut Context<'_>,
155 buf: &mut ReadBuf<'_>,
156 ) -> Poll<io::Result<()>> {
157self.project().inner.poll_read(cx, buf)
158 }
159}
160161/// Seek to an offset, in bytes, in the underlying stream.
162///
163/// The position used for seeking with `SeekFrom::Current(_)` is the
164/// position the underlying stream would be at if the `BufStream` had no
165/// internal buffer.
166///
167/// Seeking always discards the internal buffer, even if the seek position
168/// would otherwise fall within it. This guarantees that calling
169/// `.into_inner()` immediately after a seek yields the underlying reader
170/// at the same position.
171///
172/// See [`AsyncSeek`] for more details.
173///
174/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
175/// where `n` minus the internal buffer length overflows an `i64`, two
176/// seeks will be performed instead of one. If the second seek returns
177/// `Err`, the underlying reader will be left at the same position it would
178/// have if you called `seek` with `SeekFrom::Current(0)`.
179impl<RW: AsyncRead + AsyncWrite + AsyncSeek> AsyncSeek for BufStream<RW> {
180fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
181self.project().inner.start_seek(position)
182 }
183184fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
185self.project().inner.poll_complete(cx)
186 }
187}
188189impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> {
190fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
191self.project().inner.poll_fill_buf(cx)
192 }
193194fn consume(self: Pin<&mut Self>, amt: usize) {
195self.project().inner.consume(amt);
196 }
197}
198199#[cfg(test)]
200mod tests {
201use super::*;
202203#[test]
204fn assert_unpin() {
205crate::is_unpin::<BufStream<()>>();
206 }
207}