flate2/
zio.rs

1use std::io;
2use std::io::prelude::*;
3use std::mem;
4
5use crate::{
6    Compress, CompressError, Decompress, DecompressError, FlushCompress, FlushDecompress, Status,
7};
8
9#[derive(Debug)]
10pub struct Writer<W: Write, D: Ops> {
11    obj: Option<W>,
12    pub data: D,
13    buf: Vec<u8>,
14}
15
16pub trait Ops {
17    type Error: Into<io::Error>;
18    type Flush: Flush;
19    fn total_in(&self) -> u64;
20    fn total_out(&self) -> u64;
21    fn run(
22        &mut self,
23        input: &[u8],
24        output: &mut [u8],
25        flush: Self::Flush,
26    ) -> Result<Status, Self::Error>;
27    fn run_vec(
28        &mut self,
29        input: &[u8],
30        output: &mut Vec<u8>,
31        flush: Self::Flush,
32    ) -> Result<Status, Self::Error>;
33}
34
35impl Ops for Compress {
36    type Error = CompressError;
37    type Flush = FlushCompress;
38    fn total_in(&self) -> u64 {
39        self.total_in()
40    }
41    fn total_out(&self) -> u64 {
42        self.total_out()
43    }
44    fn run(
45        &mut self,
46        input: &[u8],
47        output: &mut [u8],
48        flush: FlushCompress,
49    ) -> Result<Status, CompressError> {
50        self.compress(input, output, flush)
51    }
52    fn run_vec(
53        &mut self,
54        input: &[u8],
55        output: &mut Vec<u8>,
56        flush: FlushCompress,
57    ) -> Result<Status, CompressError> {
58        self.compress_vec(input, output, flush)
59    }
60}
61
62impl Ops for Decompress {
63    type Error = DecompressError;
64    type Flush = FlushDecompress;
65    fn total_in(&self) -> u64 {
66        self.total_in()
67    }
68    fn total_out(&self) -> u64 {
69        self.total_out()
70    }
71    fn run(
72        &mut self,
73        input: &[u8],
74        output: &mut [u8],
75        flush: FlushDecompress,
76    ) -> Result<Status, DecompressError> {
77        self.decompress(input, output, flush)
78    }
79    fn run_vec(
80        &mut self,
81        input: &[u8],
82        output: &mut Vec<u8>,
83        flush: FlushDecompress,
84    ) -> Result<Status, DecompressError> {
85        self.decompress_vec(input, output, flush)
86    }
87}
88
89pub trait Flush {
90    fn none() -> Self;
91    fn sync() -> Self;
92    fn finish() -> Self;
93}
94
95impl Flush for FlushCompress {
96    fn none() -> Self {
97        FlushCompress::None
98    }
99
100    fn sync() -> Self {
101        FlushCompress::Sync
102    }
103
104    fn finish() -> Self {
105        FlushCompress::Finish
106    }
107}
108
109impl Flush for FlushDecompress {
110    fn none() -> Self {
111        FlushDecompress::None
112    }
113
114    fn sync() -> Self {
115        FlushDecompress::Sync
116    }
117
118    fn finish() -> Self {
119        FlushDecompress::Finish
120    }
121}
122
123pub fn read<R, D>(obj: &mut R, data: &mut D, dst: &mut [u8]) -> io::Result<usize>
124where
125    R: BufRead,
126    D: Ops,
127{
128    loop {
129        let (read, consumed, ret, eof);
130        {
131            let input = obj.fill_buf()?;
132            eof = input.is_empty();
133            let before_out = data.total_out();
134            let before_in = data.total_in();
135            let flush = if eof {
136                D::Flush::finish()
137            } else {
138                D::Flush::none()
139            };
140            ret = data.run(input, dst, flush);
141            read = (data.total_out() - before_out) as usize;
142            consumed = (data.total_in() - before_in) as usize;
143        }
144        obj.consume(consumed);
145
146        match ret {
147            // If we haven't ready any data and we haven't hit EOF yet,
148            // then we need to keep asking for more data because if we
149            // return that 0 bytes of data have been read then it will
150            // be interpreted as EOF.
151            Ok(Status::Ok | Status::BufError) if read == 0 && !eof && !dst.is_empty() => continue,
152            Ok(Status::Ok | Status::BufError | Status::StreamEnd) => return Ok(read),
153
154            Err(..) => {
155                return Err(io::Error::new(
156                    io::ErrorKind::InvalidInput,
157                    "corrupt deflate stream",
158                ))
159            }
160        }
161    }
162}
163
164impl<W: Write, D: Ops> Writer<W, D> {
165    pub fn new(w: W, d: D) -> Writer<W, D> {
166        Writer {
167            obj: Some(w),
168            data: d,
169            buf: Vec::with_capacity(32 * 1024),
170        }
171    }
172
173    pub fn finish(&mut self) -> io::Result<()> {
174        loop {
175            self.dump()?;
176
177            let before = self.data.total_out();
178            self.data
179                .run_vec(&[], &mut self.buf, Flush::finish())
180                .map_err(Into::into)?;
181            if before == self.data.total_out() {
182                return Ok(());
183            }
184        }
185    }
186
187    pub fn replace(&mut self, w: W) -> W {
188        self.buf.truncate(0);
189        mem::replace(self.get_mut(), w)
190    }
191
192    pub fn get_ref(&self) -> &W {
193        self.obj.as_ref().unwrap()
194    }
195
196    pub fn get_mut(&mut self) -> &mut W {
197        self.obj.as_mut().unwrap()
198    }
199
200    // Note that this should only be called if the outer object is just about
201    // to be consumed!
202    //
203    // (e.g. an implementation of `into_inner`)
204    pub fn take_inner(&mut self) -> W {
205        self.obj.take().unwrap()
206    }
207
208    pub fn is_present(&self) -> bool {
209        self.obj.is_some()
210    }
211
212    // Returns total written bytes and status of underlying codec
213    pub(crate) fn write_with_status(&mut self, buf: &[u8]) -> io::Result<(usize, Status)> {
214        // miniz isn't guaranteed to actually write any of the buffer provided,
215        // it may be in a flushing mode where it's just giving us data before
216        // we're actually giving it any data. We don't want to spuriously return
217        // `Ok(0)` when possible as it will cause calls to write_all() to fail.
218        // As a result we execute this in a loop to ensure that we try our
219        // darndest to write the data.
220        loop {
221            self.dump()?;
222
223            let before_in = self.data.total_in();
224            let ret = self.data.run_vec(buf, &mut self.buf, D::Flush::none());
225            let written = (self.data.total_in() - before_in) as usize;
226            let is_stream_end = matches!(ret, Ok(Status::StreamEnd));
227
228            if !buf.is_empty() && written == 0 && ret.is_ok() && !is_stream_end {
229                continue;
230            }
231            return match ret {
232                Ok(st) => match st {
233                    Status::Ok | Status::BufError | Status::StreamEnd => Ok((written, st)),
234                },
235                Err(..) => Err(io::Error::new(
236                    io::ErrorKind::InvalidInput,
237                    "corrupt deflate stream",
238                )),
239            };
240        }
241    }
242
243    fn dump(&mut self) -> io::Result<()> {
244        // TODO: should manage this buffer not with `drain` but probably more of
245        // a deque-like strategy.
246        while !self.buf.is_empty() {
247            let n = self.obj.as_mut().unwrap().write(&self.buf)?;
248            if n == 0 {
249                return Err(io::ErrorKind::WriteZero.into());
250            }
251            self.buf.drain(..n);
252        }
253        Ok(())
254    }
255}
256
257impl<W: Write, D: Ops> Write for Writer<W, D> {
258    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
259        self.write_with_status(buf).map(|res| res.0)
260    }
261
262    fn flush(&mut self) -> io::Result<()> {
263        self.data
264            .run_vec(&[], &mut self.buf, Flush::sync())
265            .map_err(Into::into)?;
266
267        // Unfortunately miniz doesn't actually tell us when we're done with
268        // pulling out all the data from the internal stream. To remedy this we
269        // have to continually ask the stream for more memory until it doesn't
270        // give us a chunk of memory the same size as our own internal buffer,
271        // at which point we assume it's reached the end.
272        loop {
273            self.dump()?;
274            let before = self.data.total_out();
275            self.data
276                .run_vec(&[], &mut self.buf, Flush::none())
277                .map_err(Into::into)?;
278            if before == self.data.total_out() {
279                break;
280            }
281        }
282
283        self.obj.as_mut().unwrap().flush()
284    }
285}
286
287impl<W: Write, D: Ops> Drop for Writer<W, D> {
288    fn drop(&mut self) {
289        if self.obj.is_some() {
290            let _ = self.finish();
291        }
292    }
293}