tokio/fs/file.rs
1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use crate::fs::{asyncify, OpenOptions};
6use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
7use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
8use crate::sync::Mutex;
9
10use std::cmp;
11use std::fmt;
12use std::fs::{Metadata, Permissions};
13use std::future::Future;
14use std::io::{self, Seek, SeekFrom};
15use std::path::Path;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{ready, Context, Poll};
19
20#[cfg(test)]
21use super::mocks::JoinHandle;
22#[cfg(test)]
23use super::mocks::MockFile as StdFile;
24#[cfg(test)]
25use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
26#[cfg(not(test))]
27use crate::blocking::JoinHandle;
28#[cfg(not(test))]
29use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
30#[cfg(not(test))]
31use std::fs::File as StdFile;
32
33/// A reference to an open file on the filesystem.
34///
35/// This is a specialized version of [`std::fs::File`] for usage from the
36/// Tokio runtime.
37///
38/// An instance of a `File` can be read and/or written depending on what options
39/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
40/// cursor that the file contains internally.
41///
42/// A file will not be closed immediately when it goes out of scope if there
43/// are any IO operations that have not yet completed. To ensure that a file is
44/// closed immediately when it is dropped, you should call [`flush`] before
45/// dropping it. Note that this does not ensure that the file has been fully
46/// written to disk; the operating system might keep the changes around in an
47/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
48/// the data to disk.
49///
50/// Reading and writing to a `File` is usually done using the convenience
51/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
52///
53/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
54/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
55/// [`sync_all`]: fn@crate::fs::File::sync_all
56/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
57/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
58///
59/// # Examples
60///
61/// Create a new file and asynchronously write bytes to it:
62///
63/// ```no_run
64/// use tokio::fs::File;
65/// use tokio::io::AsyncWriteExt; // for write_all()
66///
67/// # async fn dox() -> std::io::Result<()> {
68/// let mut file = File::create("foo.txt").await?;
69/// file.write_all(b"hello, world!").await?;
70/// # Ok(())
71/// # }
72/// ```
73///
74/// Read the contents of a file into a buffer:
75///
76/// ```no_run
77/// use tokio::fs::File;
78/// use tokio::io::AsyncReadExt; // for read_to_end()
79///
80/// # async fn dox() -> std::io::Result<()> {
81/// let mut file = File::open("foo.txt").await?;
82///
83/// let mut contents = vec![];
84/// file.read_to_end(&mut contents).await?;
85///
86/// println!("len = {}", contents.len());
87/// # Ok(())
88/// # }
89/// ```
90pub struct File {
91 std: Arc<StdFile>,
92 inner: Mutex<Inner>,
93 max_buf_size: usize,
94}
95
96struct Inner {
97 state: State,
98
99 /// Errors from writes/flushes are returned in write/flush calls. If a write
100 /// error is observed while performing a read, it is saved until the next
101 /// write / flush call.
102 last_write_err: Option<io::ErrorKind>,
103
104 pos: u64,
105}
106
107#[derive(Debug)]
108enum State {
109 Idle(Option<Buf>),
110 Busy(JoinHandle<(Operation, Buf)>),
111}
112
113#[derive(Debug)]
114enum Operation {
115 Read(io::Result<usize>),
116 Write(io::Result<()>),
117 Seek(io::Result<u64>),
118}
119
120impl File {
121 /// Attempts to open a file in read-only mode.
122 ///
123 /// See [`OpenOptions`] for more details.
124 ///
125 /// # Errors
126 ///
127 /// This function will return an error if called from outside of the Tokio
128 /// runtime or if path does not already exist. Other errors may also be
129 /// returned according to `OpenOptions::open`.
130 ///
131 /// # Examples
132 ///
133 /// ```no_run
134 /// use tokio::fs::File;
135 /// use tokio::io::AsyncReadExt;
136 ///
137 /// # async fn dox() -> std::io::Result<()> {
138 /// let mut file = File::open("foo.txt").await?;
139 ///
140 /// let mut contents = vec![];
141 /// file.read_to_end(&mut contents).await?;
142 ///
143 /// println!("len = {}", contents.len());
144 /// # Ok(())
145 /// # }
146 /// ```
147 ///
148 /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
149 ///
150 /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
151 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
152 pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
153 let path = path.as_ref().to_owned();
154 let std = asyncify(|| StdFile::open(path)).await?;
155
156 Ok(File::from_std(std))
157 }
158
159 /// Opens a file in write-only mode.
160 ///
161 /// This function will create a file if it does not exist, and will truncate
162 /// it if it does.
163 ///
164 /// See [`OpenOptions`] for more details.
165 ///
166 /// # Errors
167 ///
168 /// Results in an error if called from outside of the Tokio runtime or if
169 /// the underlying [`create`] call results in an error.
170 ///
171 /// [`create`]: std::fs::File::create
172 ///
173 /// # Examples
174 ///
175 /// ```no_run
176 /// use tokio::fs::File;
177 /// use tokio::io::AsyncWriteExt;
178 ///
179 /// # async fn dox() -> std::io::Result<()> {
180 /// let mut file = File::create("foo.txt").await?;
181 /// file.write_all(b"hello, world!").await?;
182 /// # Ok(())
183 /// # }
184 /// ```
185 ///
186 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
187 ///
188 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
189 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
190 pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
191 let path = path.as_ref().to_owned();
192 let std_file = asyncify(move || StdFile::create(path)).await?;
193 Ok(File::from_std(std_file))
194 }
195
196 /// Opens a file in read-write mode.
197 ///
198 /// This function will create a file if it does not exist, or return an error
199 /// if it does. This way, if the call succeeds, the file returned is guaranteed
200 /// to be new.
201 ///
202 /// This option is useful because it is atomic. Otherwise between checking
203 /// whether a file exists and creating a new one, the file may have been
204 /// created by another process (a TOCTOU race condition / attack).
205 ///
206 /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`.
207 ///
208 /// See [`OpenOptions`] for more details.
209 ///
210 /// # Examples
211 ///
212 /// ```no_run
213 /// use tokio::fs::File;
214 /// use tokio::io::AsyncWriteExt;
215 ///
216 /// # async fn dox() -> std::io::Result<()> {
217 /// let mut file = File::create_new("foo.txt").await?;
218 /// file.write_all(b"hello, world!").await?;
219 /// # Ok(())
220 /// # }
221 /// ```
222 ///
223 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
224 ///
225 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
226 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
227 pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
228 Self::options()
229 .read(true)
230 .write(true)
231 .create_new(true)
232 .open(path)
233 .await
234 }
235
236 /// Returns a new [`OpenOptions`] object.
237 ///
238 /// This function returns a new `OpenOptions` object that you can use to
239 /// open or create a file with specific options if `open()` or `create()`
240 /// are not appropriate.
241 ///
242 /// It is equivalent to `OpenOptions::new()`, but allows you to write more
243 /// readable code. Instead of
244 /// `OpenOptions::new().append(true).open("example.log")`,
245 /// you can write `File::options().append(true).open("example.log")`. This
246 /// also avoids the need to import `OpenOptions`.
247 ///
248 /// See the [`OpenOptions::new`] function for more details.
249 ///
250 /// # Examples
251 ///
252 /// ```no_run
253 /// use tokio::fs::File;
254 /// use tokio::io::AsyncWriteExt;
255 ///
256 /// # async fn dox() -> std::io::Result<()> {
257 /// let mut f = File::options().append(true).open("example.log").await?;
258 /// f.write_all(b"new line\n").await?;
259 /// # Ok(())
260 /// # }
261 /// ```
262 #[must_use]
263 pub fn options() -> OpenOptions {
264 OpenOptions::new()
265 }
266
267 /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File).
268 ///
269 /// # Examples
270 ///
271 /// ```no_run
272 /// // This line could block. It is not recommended to do this on the Tokio
273 /// // runtime.
274 /// let std_file = std::fs::File::open("foo.txt").unwrap();
275 /// let file = tokio::fs::File::from_std(std_file);
276 /// ```
277 pub fn from_std(std: StdFile) -> File {
278 File {
279 std: Arc::new(std),
280 inner: Mutex::new(Inner {
281 state: State::Idle(Some(Buf::with_capacity(0))),
282 last_write_err: None,
283 pos: 0,
284 }),
285 max_buf_size: DEFAULT_MAX_BUF_SIZE,
286 }
287 }
288
289 /// Attempts to sync all OS-internal metadata to disk.
290 ///
291 /// This function will attempt to ensure that all in-core data reaches the
292 /// filesystem before returning.
293 ///
294 /// # Examples
295 ///
296 /// ```no_run
297 /// use tokio::fs::File;
298 /// use tokio::io::AsyncWriteExt;
299 ///
300 /// # async fn dox() -> std::io::Result<()> {
301 /// let mut file = File::create("foo.txt").await?;
302 /// file.write_all(b"hello, world!").await?;
303 /// file.sync_all().await?;
304 /// # Ok(())
305 /// # }
306 /// ```
307 ///
308 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
309 ///
310 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
311 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
312 pub async fn sync_all(&self) -> io::Result<()> {
313 let mut inner = self.inner.lock().await;
314 inner.complete_inflight().await;
315
316 let std = self.std.clone();
317 asyncify(move || std.sync_all()).await
318 }
319
320 /// This function is similar to `sync_all`, except that it may not
321 /// synchronize file metadata to the filesystem.
322 ///
323 /// This is intended for use cases that must synchronize content, but don't
324 /// need the metadata on disk. The goal of this method is to reduce disk
325 /// operations.
326 ///
327 /// Note that some platforms may simply implement this in terms of `sync_all`.
328 ///
329 /// # Examples
330 ///
331 /// ```no_run
332 /// use tokio::fs::File;
333 /// use tokio::io::AsyncWriteExt;
334 ///
335 /// # async fn dox() -> std::io::Result<()> {
336 /// let mut file = File::create("foo.txt").await?;
337 /// file.write_all(b"hello, world!").await?;
338 /// file.sync_data().await?;
339 /// # Ok(())
340 /// # }
341 /// ```
342 ///
343 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
344 ///
345 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
346 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
347 pub async fn sync_data(&self) -> io::Result<()> {
348 let mut inner = self.inner.lock().await;
349 inner.complete_inflight().await;
350
351 let std = self.std.clone();
352 asyncify(move || std.sync_data()).await
353 }
354
355 /// Truncates or extends the underlying file, updating the size of this file to become size.
356 ///
357 /// If the size is less than the current file's size, then the file will be
358 /// shrunk. If it is greater than the current file's size, then the file
359 /// will be extended to size and have all of the intermediate data filled in
360 /// with 0s.
361 ///
362 /// # Errors
363 ///
364 /// This function will return an error if the file is not opened for
365 /// writing.
366 ///
367 /// # Examples
368 ///
369 /// ```no_run
370 /// use tokio::fs::File;
371 /// use tokio::io::AsyncWriteExt;
372 ///
373 /// # async fn dox() -> std::io::Result<()> {
374 /// let mut file = File::create("foo.txt").await?;
375 /// file.write_all(b"hello, world!").await?;
376 /// file.set_len(10).await?;
377 /// # Ok(())
378 /// # }
379 /// ```
380 ///
381 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
382 ///
383 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
384 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
385 pub async fn set_len(&self, size: u64) -> io::Result<()> {
386 let mut inner = self.inner.lock().await;
387 inner.complete_inflight().await;
388
389 let mut buf = match inner.state {
390 State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
391 _ => unreachable!(),
392 };
393
394 let seek = if !buf.is_empty() {
395 Some(SeekFrom::Current(buf.discard_read()))
396 } else {
397 None
398 };
399
400 let std = self.std.clone();
401
402 inner.state = State::Busy(spawn_blocking(move || {
403 let res = if let Some(seek) = seek {
404 (&*std).seek(seek).and_then(|_| std.set_len(size))
405 } else {
406 std.set_len(size)
407 }
408 .map(|()| 0); // the value is discarded later
409
410 // Return the result as a seek
411 (Operation::Seek(res), buf)
412 }));
413
414 let (op, buf) = match inner.state {
415 State::Idle(_) => unreachable!(),
416 State::Busy(ref mut rx) => rx.await?,
417 };
418
419 inner.state = State::Idle(Some(buf));
420
421 match op {
422 Operation::Seek(res) => res.map(|pos| {
423 inner.pos = pos;
424 }),
425 _ => unreachable!(),
426 }
427 }
428
429 /// Queries metadata about the underlying file.
430 ///
431 /// # Examples
432 ///
433 /// ```no_run
434 /// use tokio::fs::File;
435 ///
436 /// # async fn dox() -> std::io::Result<()> {
437 /// let file = File::open("foo.txt").await?;
438 /// let metadata = file.metadata().await?;
439 ///
440 /// println!("{:?}", metadata);
441 /// # Ok(())
442 /// # }
443 /// ```
444 pub async fn metadata(&self) -> io::Result<Metadata> {
445 let std = self.std.clone();
446 asyncify(move || std.metadata()).await
447 }
448
449 /// Creates a new `File` instance that shares the same underlying file handle
450 /// as the existing `File` instance. Reads, writes, and seeks will affect both
451 /// File instances simultaneously.
452 ///
453 /// # Examples
454 ///
455 /// ```no_run
456 /// use tokio::fs::File;
457 ///
458 /// # async fn dox() -> std::io::Result<()> {
459 /// let file = File::open("foo.txt").await?;
460 /// let file_clone = file.try_clone().await?;
461 /// # Ok(())
462 /// # }
463 /// ```
464 pub async fn try_clone(&self) -> io::Result<File> {
465 self.inner.lock().await.complete_inflight().await;
466 let std = self.std.clone();
467 let std_file = asyncify(move || std.try_clone()).await?;
468 Ok(File::from_std(std_file))
469 }
470
471 /// Destructures `File` into a [`std::fs::File`]. This function is
472 /// async to allow any in-flight operations to complete.
473 ///
474 /// Use `File::try_into_std` to attempt conversion immediately.
475 ///
476 /// # Examples
477 ///
478 /// ```no_run
479 /// use tokio::fs::File;
480 ///
481 /// # async fn dox() -> std::io::Result<()> {
482 /// let tokio_file = File::open("foo.txt").await?;
483 /// let std_file = tokio_file.into_std().await;
484 /// # Ok(())
485 /// # }
486 /// ```
487 pub async fn into_std(mut self) -> StdFile {
488 self.inner.get_mut().complete_inflight().await;
489 Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
490 }
491
492 /// Tries to immediately destructure `File` into a [`std::fs::File`].
493 ///
494 /// # Errors
495 ///
496 /// This function will return an error containing the file if some
497 /// operation is in-flight.
498 ///
499 /// # Examples
500 ///
501 /// ```no_run
502 /// use tokio::fs::File;
503 ///
504 /// # async fn dox() -> std::io::Result<()> {
505 /// let tokio_file = File::open("foo.txt").await?;
506 /// let std_file = tokio_file.try_into_std().unwrap();
507 /// # Ok(())
508 /// # }
509 /// ```
510 pub fn try_into_std(mut self) -> Result<StdFile, Self> {
511 match Arc::try_unwrap(self.std) {
512 Ok(file) => Ok(file),
513 Err(std_file_arc) => {
514 self.std = std_file_arc;
515 Err(self)
516 }
517 }
518 }
519
520 /// Changes the permissions on the underlying file.
521 ///
522 /// # Platform-specific behavior
523 ///
524 /// This function currently corresponds to the `fchmod` function on Unix and
525 /// the `SetFileInformationByHandle` function on Windows. Note that, this
526 /// [may change in the future][changes].
527 ///
528 /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
529 ///
530 /// # Errors
531 ///
532 /// This function will return an error if the user lacks permission change
533 /// attributes on the underlying file. It may also return an error in other
534 /// os-specific unspecified cases.
535 ///
536 /// # Examples
537 ///
538 /// ```no_run
539 /// use tokio::fs::File;
540 ///
541 /// # async fn dox() -> std::io::Result<()> {
542 /// let file = File::open("foo.txt").await?;
543 /// let mut perms = file.metadata().await?.permissions();
544 /// perms.set_readonly(true);
545 /// file.set_permissions(perms).await?;
546 /// # Ok(())
547 /// # }
548 /// ```
549 pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
550 let std = self.std.clone();
551 asyncify(move || std.set_permissions(perm)).await
552 }
553
554 /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
555 ///
556 /// Although Tokio uses a sensible default value for this buffer size, this function would be
557 /// useful for changing that default depending on the situation.
558 ///
559 /// # Examples
560 ///
561 /// ```no_run
562 /// use tokio::fs::File;
563 /// use tokio::io::AsyncWriteExt;
564 ///
565 /// # async fn dox() -> std::io::Result<()> {
566 /// let mut file = File::open("foo.txt").await?;
567 ///
568 /// // Set maximum buffer size to 8 MiB
569 /// file.set_max_buf_size(8 * 1024 * 1024);
570 ///
571 /// let mut buf = vec![1; 1024 * 1024 * 1024];
572 ///
573 /// // Write the 1 GiB buffer in chunks up to 8 MiB each.
574 /// file.write_all(&mut buf).await?;
575 /// # Ok(())
576 /// # }
577 /// ```
578 pub fn set_max_buf_size(&mut self, max_buf_size: usize) {
579 self.max_buf_size = max_buf_size;
580 }
581}
582
583impl AsyncRead for File {
584 fn poll_read(
585 self: Pin<&mut Self>,
586 cx: &mut Context<'_>,
587 dst: &mut ReadBuf<'_>,
588 ) -> Poll<io::Result<()>> {
589 ready!(crate::trace::trace_leaf(cx));
590
591 let me = self.get_mut();
592 let inner = me.inner.get_mut();
593
594 loop {
595 match inner.state {
596 State::Idle(ref mut buf_cell) => {
597 let mut buf = buf_cell.take().unwrap();
598
599 if !buf.is_empty() || dst.remaining() == 0 {
600 buf.copy_to(dst);
601 *buf_cell = Some(buf);
602 return Poll::Ready(Ok(()));
603 }
604
605 let std = me.std.clone();
606
607 let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size);
608 inner.state = State::Busy(spawn_blocking(move || {
609 // SAFETY: the `Read` implementation of `std` does not
610 // read from the buffer it is borrowing and correctly
611 // reports the length of the data written into the buffer.
612 let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
613 (Operation::Read(res), buf)
614 }));
615 }
616 State::Busy(ref mut rx) => {
617 let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
618
619 match op {
620 Operation::Read(Ok(_)) => {
621 buf.copy_to(dst);
622 inner.state = State::Idle(Some(buf));
623 return Poll::Ready(Ok(()));
624 }
625 Operation::Read(Err(e)) => {
626 assert!(buf.is_empty());
627
628 inner.state = State::Idle(Some(buf));
629 return Poll::Ready(Err(e));
630 }
631 Operation::Write(Ok(())) => {
632 assert!(buf.is_empty());
633 inner.state = State::Idle(Some(buf));
634 continue;
635 }
636 Operation::Write(Err(e)) => {
637 assert!(inner.last_write_err.is_none());
638 inner.last_write_err = Some(e.kind());
639 inner.state = State::Idle(Some(buf));
640 }
641 Operation::Seek(result) => {
642 assert!(buf.is_empty());
643 inner.state = State::Idle(Some(buf));
644 if let Ok(pos) = result {
645 inner.pos = pos;
646 }
647 continue;
648 }
649 }
650 }
651 }
652 }
653 }
654}
655
656impl AsyncSeek for File {
657 fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
658 let me = self.get_mut();
659 let inner = me.inner.get_mut();
660
661 match inner.state {
662 State::Busy(_) => Err(io::Error::new(
663 io::ErrorKind::Other,
664 "other file operation is pending, call poll_complete before start_seek",
665 )),
666 State::Idle(ref mut buf_cell) => {
667 let mut buf = buf_cell.take().unwrap();
668
669 // Factor in any unread data from the buf
670 if !buf.is_empty() {
671 let n = buf.discard_read();
672
673 if let SeekFrom::Current(ref mut offset) = pos {
674 *offset += n;
675 }
676 }
677
678 let std = me.std.clone();
679
680 inner.state = State::Busy(spawn_blocking(move || {
681 let res = (&*std).seek(pos);
682 (Operation::Seek(res), buf)
683 }));
684 Ok(())
685 }
686 }
687 }
688
689 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
690 ready!(crate::trace::trace_leaf(cx));
691 let inner = self.inner.get_mut();
692
693 loop {
694 match inner.state {
695 State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
696 State::Busy(ref mut rx) => {
697 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
698 inner.state = State::Idle(Some(buf));
699
700 match op {
701 Operation::Read(_) => {}
702 Operation::Write(Err(e)) => {
703 assert!(inner.last_write_err.is_none());
704 inner.last_write_err = Some(e.kind());
705 }
706 Operation::Write(_) => {}
707 Operation::Seek(res) => {
708 if let Ok(pos) = res {
709 inner.pos = pos;
710 }
711 return Poll::Ready(res);
712 }
713 }
714 }
715 }
716 }
717 }
718}
719
720impl AsyncWrite for File {
721 fn poll_write(
722 self: Pin<&mut Self>,
723 cx: &mut Context<'_>,
724 src: &[u8],
725 ) -> Poll<io::Result<usize>> {
726 ready!(crate::trace::trace_leaf(cx));
727 let me = self.get_mut();
728 let inner = me.inner.get_mut();
729
730 if let Some(e) = inner.last_write_err.take() {
731 return Poll::Ready(Err(e.into()));
732 }
733
734 loop {
735 match inner.state {
736 State::Idle(ref mut buf_cell) => {
737 let mut buf = buf_cell.take().unwrap();
738
739 let seek = if !buf.is_empty() {
740 Some(SeekFrom::Current(buf.discard_read()))
741 } else {
742 None
743 };
744
745 let n = buf.copy_from(src, me.max_buf_size);
746 let std = me.std.clone();
747
748 let blocking_task_join_handle = spawn_mandatory_blocking(move || {
749 let res = if let Some(seek) = seek {
750 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
751 } else {
752 buf.write_to(&mut &*std)
753 };
754
755 (Operation::Write(res), buf)
756 })
757 .ok_or_else(|| {
758 io::Error::new(io::ErrorKind::Other, "background task failed")
759 })?;
760
761 inner.state = State::Busy(blocking_task_join_handle);
762
763 return Poll::Ready(Ok(n));
764 }
765 State::Busy(ref mut rx) => {
766 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
767 inner.state = State::Idle(Some(buf));
768
769 match op {
770 Operation::Read(_) => {
771 // We don't care about the result here. The fact
772 // that the cursor has advanced will be reflected in
773 // the next iteration of the loop
774 continue;
775 }
776 Operation::Write(res) => {
777 // If the previous write was successful, continue.
778 // Otherwise, error.
779 res?;
780 continue;
781 }
782 Operation::Seek(_) => {
783 // Ignore the seek
784 continue;
785 }
786 }
787 }
788 }
789 }
790 }
791
792 fn poll_write_vectored(
793 self: Pin<&mut Self>,
794 cx: &mut Context<'_>,
795 bufs: &[io::IoSlice<'_>],
796 ) -> Poll<Result<usize, io::Error>> {
797 ready!(crate::trace::trace_leaf(cx));
798 let me = self.get_mut();
799 let inner = me.inner.get_mut();
800
801 if let Some(e) = inner.last_write_err.take() {
802 return Poll::Ready(Err(e.into()));
803 }
804
805 loop {
806 match inner.state {
807 State::Idle(ref mut buf_cell) => {
808 let mut buf = buf_cell.take().unwrap();
809
810 let seek = if !buf.is_empty() {
811 Some(SeekFrom::Current(buf.discard_read()))
812 } else {
813 None
814 };
815
816 let n = buf.copy_from_bufs(bufs, me.max_buf_size);
817 let std = me.std.clone();
818
819 let blocking_task_join_handle = spawn_mandatory_blocking(move || {
820 let res = if let Some(seek) = seek {
821 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
822 } else {
823 buf.write_to(&mut &*std)
824 };
825
826 (Operation::Write(res), buf)
827 })
828 .ok_or_else(|| {
829 io::Error::new(io::ErrorKind::Other, "background task failed")
830 })?;
831
832 inner.state = State::Busy(blocking_task_join_handle);
833
834 return Poll::Ready(Ok(n));
835 }
836 State::Busy(ref mut rx) => {
837 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
838 inner.state = State::Idle(Some(buf));
839
840 match op {
841 Operation::Read(_) => {
842 // We don't care about the result here. The fact
843 // that the cursor has advanced will be reflected in
844 // the next iteration of the loop
845 continue;
846 }
847 Operation::Write(res) => {
848 // If the previous write was successful, continue.
849 // Otherwise, error.
850 res?;
851 continue;
852 }
853 Operation::Seek(_) => {
854 // Ignore the seek
855 continue;
856 }
857 }
858 }
859 }
860 }
861 }
862
863 fn is_write_vectored(&self) -> bool {
864 true
865 }
866
867 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
868 ready!(crate::trace::trace_leaf(cx));
869 let inner = self.inner.get_mut();
870 inner.poll_flush(cx)
871 }
872
873 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
874 ready!(crate::trace::trace_leaf(cx));
875 self.poll_flush(cx)
876 }
877}
878
879impl From<StdFile> for File {
880 fn from(std: StdFile) -> Self {
881 Self::from_std(std)
882 }
883}
884
885impl fmt::Debug for File {
886 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
887 fmt.debug_struct("tokio::fs::File")
888 .field("std", &self.std)
889 .finish()
890 }
891}
892
893#[cfg(unix)]
894impl std::os::unix::io::AsRawFd for File {
895 fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
896 self.std.as_raw_fd()
897 }
898}
899
900#[cfg(unix)]
901impl std::os::unix::io::AsFd for File {
902 fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
903 unsafe {
904 std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self))
905 }
906 }
907}
908
909#[cfg(unix)]
910impl std::os::unix::io::FromRawFd for File {
911 unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
912 StdFile::from_raw_fd(fd).into()
913 }
914}
915
916cfg_windows! {
917 use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle};
918
919 impl AsRawHandle for File {
920 fn as_raw_handle(&self) -> RawHandle {
921 self.std.as_raw_handle()
922 }
923 }
924
925 impl AsHandle for File {
926 fn as_handle(&self) -> BorrowedHandle<'_> {
927 unsafe {
928 BorrowedHandle::borrow_raw(
929 AsRawHandle::as_raw_handle(self),
930 )
931 }
932 }
933 }
934
935 impl FromRawHandle for File {
936 unsafe fn from_raw_handle(handle: RawHandle) -> Self {
937 StdFile::from_raw_handle(handle).into()
938 }
939 }
940}
941
942impl Inner {
943 async fn complete_inflight(&mut self) {
944 use std::future::poll_fn;
945
946 poll_fn(|cx| self.poll_complete_inflight(cx)).await;
947 }
948
949 fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
950 ready!(crate::trace::trace_leaf(cx));
951 match self.poll_flush(cx) {
952 Poll::Ready(Err(e)) => {
953 self.last_write_err = Some(e.kind());
954 Poll::Ready(())
955 }
956 Poll::Ready(Ok(())) => Poll::Ready(()),
957 Poll::Pending => Poll::Pending,
958 }
959 }
960
961 fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
962 if let Some(e) = self.last_write_err.take() {
963 return Poll::Ready(Err(e.into()));
964 }
965
966 let (op, buf) = match self.state {
967 State::Idle(_) => return Poll::Ready(Ok(())),
968 State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
969 };
970
971 // The buffer is not used here
972 self.state = State::Idle(Some(buf));
973
974 match op {
975 Operation::Read(_) => Poll::Ready(Ok(())),
976 Operation::Write(res) => Poll::Ready(res),
977 Operation::Seek(_) => Poll::Ready(Ok(())),
978 }
979 }
980}
981
982#[cfg(test)]
983mod tests;