tokio/sync/rwlock.rs
1use crate::sync::batch_semaphore::{Semaphore, TryAcquireError};
2use crate::sync::mutex::TryLockError;
3#[cfg(all(tokio_unstable, feature = "tracing"))]
4use crate::util::trace;
5use std::cell::UnsafeCell;
6use std::marker;
7use std::marker::PhantomData;
8use std::sync::Arc;
9
10pub(crate) mod owned_read_guard;
11pub(crate) mod owned_write_guard;
12pub(crate) mod owned_write_guard_mapped;
13pub(crate) mod read_guard;
14pub(crate) mod write_guard;
15pub(crate) mod write_guard_mapped;
16pub(crate) use owned_read_guard::OwnedRwLockReadGuard;
17pub(crate) use owned_write_guard::OwnedRwLockWriteGuard;
18pub(crate) use owned_write_guard_mapped::OwnedRwLockMappedWriteGuard;
19pub(crate) use read_guard::RwLockReadGuard;
20pub(crate) use write_guard::RwLockWriteGuard;
21pub(crate) use write_guard_mapped::RwLockMappedWriteGuard;
22
23#[cfg(not(loom))]
24const MAX_READS: u32 = u32::MAX >> 3;
25
26#[cfg(loom)]
27const MAX_READS: u32 = 10;
28
29/// An asynchronous reader-writer lock.
30///
31/// This type of lock allows a number of readers or at most one writer at any
32/// point in time. The write portion of this lock typically allows modification
33/// of the underlying data (exclusive access) and the read portion of this lock
34/// typically allows for read-only access (shared access).
35///
36/// In comparison, a [`Mutex`] does not distinguish between readers or writers
37/// that acquire the lock, therefore causing any tasks waiting for the lock to
38/// become available to yield. An `RwLock` will allow any number of readers to
39/// acquire the lock as long as a writer is not holding the lock.
40///
41/// The priority policy of Tokio's read-write lock is _fair_ (or
42/// [_write-preferring_]), in order to ensure that readers cannot starve
43/// writers. Fairness is ensured using a first-in, first-out queue for the tasks
44/// awaiting the lock; a read lock will not be given out until all write lock
45/// requests that were queued before it have been acquired and released. This is
46/// in contrast to the Rust standard library's `std::sync::RwLock`, where the
47/// priority policy is dependent on the operating system's implementation.
48///
49/// The type parameter `T` represents the data that this lock protects. It is
50/// required that `T` satisfies [`Send`] to be shared across threads. The RAII guards
51/// returned from the locking methods implement [`Deref`](trait@std::ops::Deref)
52/// (and [`DerefMut`](trait@std::ops::DerefMut)
53/// for the `write` methods) to allow access to the content of the lock.
54///
55/// # Examples
56///
57/// ```
58/// use tokio::sync::RwLock;
59///
60/// # #[tokio::main(flavor = "current_thread")]
61/// # async fn main() {
62/// let lock = RwLock::new(5);
63///
64/// // many reader locks can be held at once
65/// {
66/// let r1 = lock.read().await;
67/// let r2 = lock.read().await;
68/// assert_eq!(*r1, 5);
69/// assert_eq!(*r2, 5);
70/// } // read locks are dropped at this point
71///
72/// // only one write lock may be held, however
73/// {
74/// let mut w = lock.write().await;
75/// *w += 1;
76/// assert_eq!(*w, 6);
77/// } // write lock is dropped here
78/// # }
79/// ```
80///
81/// [`Mutex`]: struct@super::Mutex
82/// [`RwLock`]: struct@RwLock
83/// [`RwLockReadGuard`]: struct@RwLockReadGuard
84/// [`RwLockWriteGuard`]: struct@RwLockWriteGuard
85/// [`Send`]: trait@std::marker::Send
86/// [_write-preferring_]: https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Priority_policies
87pub struct RwLock<T: ?Sized> {
88 #[cfg(all(tokio_unstable, feature = "tracing"))]
89 resource_span: tracing::Span,
90
91 // maximum number of concurrent readers
92 mr: u32,
93
94 //semaphore to coordinate read and write access to T
95 s: Semaphore,
96
97 //inner data T
98 c: UnsafeCell<T>,
99}
100
101#[test]
102#[cfg(not(loom))]
103fn bounds() {
104 fn check_send<T: Send>() {}
105 fn check_sync<T: Sync>() {}
106 fn check_unpin<T: Unpin>() {}
107 // This has to take a value, since the async fn's return type is unnameable.
108 fn check_send_sync_val<T: Send + Sync>(_t: T) {}
109
110 check_send::<RwLock<u32>>();
111 check_sync::<RwLock<u32>>();
112 check_unpin::<RwLock<u32>>();
113
114 check_send::<RwLockReadGuard<'_, u32>>();
115 check_sync::<RwLockReadGuard<'_, u32>>();
116 check_unpin::<RwLockReadGuard<'_, u32>>();
117
118 check_send::<OwnedRwLockReadGuard<u32, i32>>();
119 check_sync::<OwnedRwLockReadGuard<u32, i32>>();
120 check_unpin::<OwnedRwLockReadGuard<u32, i32>>();
121
122 check_send::<RwLockWriteGuard<'_, u32>>();
123 check_sync::<RwLockWriteGuard<'_, u32>>();
124 check_unpin::<RwLockWriteGuard<'_, u32>>();
125
126 check_send::<RwLockMappedWriteGuard<'_, u32>>();
127 check_sync::<RwLockMappedWriteGuard<'_, u32>>();
128 check_unpin::<RwLockMappedWriteGuard<'_, u32>>();
129
130 check_send::<OwnedRwLockWriteGuard<u32>>();
131 check_sync::<OwnedRwLockWriteGuard<u32>>();
132 check_unpin::<OwnedRwLockWriteGuard<u32>>();
133
134 check_send::<OwnedRwLockMappedWriteGuard<u32, i32>>();
135 check_sync::<OwnedRwLockMappedWriteGuard<u32, i32>>();
136 check_unpin::<OwnedRwLockMappedWriteGuard<u32, i32>>();
137
138 let rwlock = Arc::new(RwLock::new(0));
139 check_send_sync_val(rwlock.read());
140 check_send_sync_val(Arc::clone(&rwlock).read_owned());
141 check_send_sync_val(rwlock.write());
142 check_send_sync_val(Arc::clone(&rwlock).write_owned());
143}
144
145// As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads.
146// If T were not Send, sending and sharing a RwLock<T> would be bad, since you can access T through
147// RwLock<T>.
148unsafe impl<T> Send for RwLock<T> where T: ?Sized + Send {}
149unsafe impl<T> Sync for RwLock<T> where T: ?Sized + Send + Sync {}
150// NB: These impls need to be explicit since we're storing a raw pointer.
151// Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over
152// `T` is `Send`.
153unsafe impl<T> Send for RwLockReadGuard<'_, T> where T: ?Sized + Sync {}
154unsafe impl<T> Sync for RwLockReadGuard<'_, T> where T: ?Sized + Send + Sync {}
155// T is required to be `Send` because an OwnedRwLockReadGuard can be used to drop the value held in
156// the RwLock, unlike RwLockReadGuard.
157unsafe impl<T, U> Send for OwnedRwLockReadGuard<T, U>
158where
159 T: ?Sized + Send + Sync,
160 U: ?Sized + Sync,
161{
162}
163unsafe impl<T, U> Sync for OwnedRwLockReadGuard<T, U>
164where
165 T: ?Sized + Send + Sync,
166 U: ?Sized + Send + Sync,
167{
168}
169unsafe impl<T> Sync for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
170unsafe impl<T> Sync for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
171unsafe impl<T> Sync for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
172unsafe impl<T, U> Sync for OwnedRwLockMappedWriteGuard<T, U>
173where
174 T: ?Sized + Send + Sync,
175 U: ?Sized + Send + Sync,
176{
177}
178// Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over
179// `T` is `Send` - but since this is also provides mutable access, we need to
180// make sure that `T` is `Send` since its value can be sent across thread
181// boundaries.
182unsafe impl<T> Send for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
183unsafe impl<T> Send for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
184unsafe impl<T> Send for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
185unsafe impl<T, U> Send for OwnedRwLockMappedWriteGuard<T, U>
186where
187 T: ?Sized + Send + Sync,
188 U: ?Sized + Send + Sync,
189{
190}
191
192impl<T: ?Sized> RwLock<T> {
193 /// Creates a new instance of an `RwLock<T>` which is unlocked.
194 ///
195 /// # Examples
196 ///
197 /// ```
198 /// use tokio::sync::RwLock;
199 ///
200 /// let lock = RwLock::new(5);
201 /// ```
202 #[track_caller]
203 pub fn new(value: T) -> RwLock<T>
204 where
205 T: Sized,
206 {
207 #[cfg(all(tokio_unstable, feature = "tracing"))]
208 let resource_span = {
209 let location = std::panic::Location::caller();
210 let resource_span = tracing::trace_span!(
211 parent: None,
212 "runtime.resource",
213 concrete_type = "RwLock",
214 kind = "Sync",
215 loc.file = location.file(),
216 loc.line = location.line(),
217 loc.col = location.column(),
218 );
219
220 resource_span.in_scope(|| {
221 tracing::trace!(
222 target: "runtime::resource::state_update",
223 max_readers = MAX_READS,
224 );
225
226 tracing::trace!(
227 target: "runtime::resource::state_update",
228 write_locked = false,
229 );
230
231 tracing::trace!(
232 target: "runtime::resource::state_update",
233 current_readers = 0,
234 );
235 });
236
237 resource_span
238 };
239
240 #[cfg(all(tokio_unstable, feature = "tracing"))]
241 let s = resource_span.in_scope(|| Semaphore::new(MAX_READS as usize));
242
243 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
244 let s = Semaphore::new(MAX_READS as usize);
245
246 RwLock {
247 mr: MAX_READS,
248 c: UnsafeCell::new(value),
249 s,
250 #[cfg(all(tokio_unstable, feature = "tracing"))]
251 resource_span,
252 }
253 }
254
255 /// Creates a new instance of an `RwLock<T>` which is unlocked
256 /// and allows a maximum of `max_reads` concurrent readers.
257 ///
258 /// # Examples
259 ///
260 /// ```
261 /// use tokio::sync::RwLock;
262 ///
263 /// let lock = RwLock::with_max_readers(5, 1024);
264 /// ```
265 ///
266 /// # Panics
267 ///
268 /// Panics if `max_reads` is more than `u32::MAX >> 3`.
269 #[track_caller]
270 pub fn with_max_readers(value: T, max_reads: u32) -> RwLock<T>
271 where
272 T: Sized,
273 {
274 assert!(
275 max_reads <= MAX_READS,
276 "a RwLock may not be created with more than {MAX_READS} readers"
277 );
278
279 #[cfg(all(tokio_unstable, feature = "tracing"))]
280 let resource_span = {
281 let location = std::panic::Location::caller();
282
283 let resource_span = tracing::trace_span!(
284 parent: None,
285 "runtime.resource",
286 concrete_type = "RwLock",
287 kind = "Sync",
288 loc.file = location.file(),
289 loc.line = location.line(),
290 loc.col = location.column(),
291 );
292
293 resource_span.in_scope(|| {
294 tracing::trace!(
295 target: "runtime::resource::state_update",
296 max_readers = max_reads,
297 );
298
299 tracing::trace!(
300 target: "runtime::resource::state_update",
301 write_locked = false,
302 );
303
304 tracing::trace!(
305 target: "runtime::resource::state_update",
306 current_readers = 0,
307 );
308 });
309
310 resource_span
311 };
312
313 #[cfg(all(tokio_unstable, feature = "tracing"))]
314 let s = resource_span.in_scope(|| Semaphore::new(max_reads as usize));
315
316 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
317 let s = Semaphore::new(max_reads as usize);
318
319 RwLock {
320 mr: max_reads,
321 c: UnsafeCell::new(value),
322 s,
323 #[cfg(all(tokio_unstable, feature = "tracing"))]
324 resource_span,
325 }
326 }
327
328 /// Creates a new instance of an `RwLock<T>` which is unlocked.
329 ///
330 /// When using the `tracing` [unstable feature], a `RwLock` created with
331 /// `const_new` will not be instrumented. As such, it will not be visible
332 /// in [`tokio-console`]. Instead, [`RwLock::new`] should be used to create
333 /// an instrumented object if that is needed.
334 ///
335 /// # Examples
336 ///
337 /// ```
338 /// use tokio::sync::RwLock;
339 ///
340 /// static LOCK: RwLock<i32> = RwLock::const_new(5);
341 /// ```
342 ///
343 /// [`tokio-console`]: https://github.com/tokio-rs/console
344 /// [unstable feature]: crate#unstable-features
345 #[cfg(not(all(loom, test)))]
346 pub const fn const_new(value: T) -> RwLock<T>
347 where
348 T: Sized,
349 {
350 RwLock {
351 mr: MAX_READS,
352 c: UnsafeCell::new(value),
353 s: Semaphore::const_new(MAX_READS as usize),
354 #[cfg(all(tokio_unstable, feature = "tracing"))]
355 resource_span: tracing::Span::none(),
356 }
357 }
358
359 /// Creates a new instance of an `RwLock<T>` which is unlocked
360 /// and allows a maximum of `max_reads` concurrent readers.
361 ///
362 /// # Examples
363 ///
364 /// ```
365 /// use tokio::sync::RwLock;
366 ///
367 /// static LOCK: RwLock<i32> = RwLock::const_with_max_readers(5, 1024);
368 /// ```
369 #[cfg(not(all(loom, test)))]
370 pub const fn const_with_max_readers(value: T, max_reads: u32) -> RwLock<T>
371 where
372 T: Sized,
373 {
374 assert!(max_reads <= MAX_READS);
375
376 RwLock {
377 mr: max_reads,
378 c: UnsafeCell::new(value),
379 s: Semaphore::const_new(max_reads as usize),
380 #[cfg(all(tokio_unstable, feature = "tracing"))]
381 resource_span: tracing::Span::none(),
382 }
383 }
384
385 /// Locks this `RwLock` with shared read access, causing the current task
386 /// to yield until the lock has been acquired.
387 ///
388 /// The calling task will yield until there are no writers which hold the
389 /// lock. There may be other readers inside the lock when the task resumes.
390 ///
391 /// Note that under the priority policy of [`RwLock`], read locks are not
392 /// granted until prior write locks, to prevent starvation. Therefore
393 /// deadlock may occur if a read lock is held by the current task, a write
394 /// lock attempt is made, and then a subsequent read lock attempt is made
395 /// by the current task.
396 ///
397 /// Returns an RAII guard which will drop this read access of the `RwLock`
398 /// when dropped.
399 ///
400 /// # Cancel safety
401 ///
402 /// This method uses a queue to fairly distribute locks in the order they
403 /// were requested. Cancelling a call to `read` makes you lose your place in
404 /// the queue.
405 ///
406 /// # Examples
407 ///
408 /// ```
409 /// use std::sync::Arc;
410 /// use tokio::sync::RwLock;
411 ///
412 /// # #[tokio::main(flavor = "current_thread")]
413 /// # async fn main() {
414 /// let lock = Arc::new(RwLock::new(1));
415 /// let c_lock = lock.clone();
416 ///
417 /// let n = lock.read().await;
418 /// assert_eq!(*n, 1);
419 ///
420 /// tokio::spawn(async move {
421 /// // While main has an active read lock, we acquire one too.
422 /// let r = c_lock.read().await;
423 /// assert_eq!(*r, 1);
424 /// }).await.expect("The spawned task has panicked");
425 ///
426 /// // Drop the guard after the spawned task finishes.
427 /// drop(n);
428 /// # }
429 /// ```
430 pub async fn read(&self) -> RwLockReadGuard<'_, T> {
431 let acquire_fut = async {
432 self.s.acquire(1).await.unwrap_or_else(|_| {
433 // The semaphore was closed. but, we never explicitly close it, and we have a
434 // handle to it through the Arc, which means that this can never happen.
435 unreachable!()
436 });
437
438 RwLockReadGuard {
439 s: &self.s,
440 data: self.c.get(),
441 marker: PhantomData,
442 #[cfg(all(tokio_unstable, feature = "tracing"))]
443 resource_span: self.resource_span.clone(),
444 }
445 };
446
447 #[cfg(all(tokio_unstable, feature = "tracing"))]
448 let acquire_fut = trace::async_op(
449 move || acquire_fut,
450 self.resource_span.clone(),
451 "RwLock::read",
452 "poll",
453 false,
454 );
455
456 #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
457 let guard = acquire_fut.await;
458
459 #[cfg(all(tokio_unstable, feature = "tracing"))]
460 self.resource_span.in_scope(|| {
461 tracing::trace!(
462 target: "runtime::resource::state_update",
463 current_readers = 1,
464 current_readers.op = "add",
465 )
466 });
467
468 guard
469 }
470
471 /// Blockingly locks this `RwLock` with shared read access.
472 ///
473 /// This method is intended for use cases where you
474 /// need to use this rwlock in asynchronous code as well as in synchronous code.
475 ///
476 /// Returns an RAII guard which will drop the read access of this `RwLock` when dropped.
477 ///
478 /// # Panics
479 ///
480 /// This function panics if called within an asynchronous execution context.
481 ///
482 /// - If you find yourself in an asynchronous execution context and needing
483 /// to call some (synchronous) function which performs one of these
484 /// `blocking_` operations, then consider wrapping that call inside
485 /// [`spawn_blocking()`][crate::runtime::Handle::spawn_blocking]
486 /// (or [`block_in_place()`][crate::task::block_in_place]).
487 ///
488 /// # Examples
489 ///
490 /// ```
491 /// # #[cfg(not(target_family = "wasm"))]
492 /// # {
493 /// use std::sync::Arc;
494 /// use tokio::sync::RwLock;
495 ///
496 /// #[tokio::main]
497 /// async fn main() {
498 /// let rwlock = Arc::new(RwLock::new(1));
499 /// let mut write_lock = rwlock.write().await;
500 ///
501 /// let blocking_task = tokio::task::spawn_blocking({
502 /// let rwlock = Arc::clone(&rwlock);
503 /// move || {
504 /// // This shall block until the `write_lock` is released.
505 /// let read_lock = rwlock.blocking_read();
506 /// assert_eq!(*read_lock, 0);
507 /// }
508 /// });
509 ///
510 /// *write_lock -= 1;
511 /// drop(write_lock); // release the lock.
512 ///
513 /// // Await the completion of the blocking task.
514 /// blocking_task.await.unwrap();
515 ///
516 /// // Assert uncontended.
517 /// assert!(rwlock.try_write().is_ok());
518 /// }
519 /// # }
520 /// ```
521 #[track_caller]
522 #[cfg(feature = "sync")]
523 pub fn blocking_read(&self) -> RwLockReadGuard<'_, T> {
524 crate::future::block_on(self.read())
525 }
526
527 /// Locks this `RwLock` with shared read access, causing the current task
528 /// to yield until the lock has been acquired.
529 ///
530 /// The calling task will yield until there are no writers which hold the
531 /// lock. There may be other readers inside the lock when the task resumes.
532 ///
533 /// This method is identical to [`RwLock::read`], except that the returned
534 /// guard references the `RwLock` with an [`Arc`] rather than by borrowing
535 /// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
536 /// method, and the guard will live for the `'static` lifetime, as it keeps
537 /// the `RwLock` alive by holding an `Arc`.
538 ///
539 /// Note that under the priority policy of [`RwLock`], read locks are not
540 /// granted until prior write locks, to prevent starvation. Therefore
541 /// deadlock may occur if a read lock is held by the current task, a write
542 /// lock attempt is made, and then a subsequent read lock attempt is made
543 /// by the current task.
544 ///
545 /// Returns an RAII guard which will drop this read access of the `RwLock`
546 /// when dropped.
547 ///
548 /// # Cancel safety
549 ///
550 /// This method uses a queue to fairly distribute locks in the order they
551 /// were requested. Cancelling a call to `read_owned` makes you lose your
552 /// place in the queue.
553 ///
554 /// # Examples
555 ///
556 /// ```
557 /// use std::sync::Arc;
558 /// use tokio::sync::RwLock;
559 ///
560 /// # #[tokio::main(flavor = "current_thread")]
561 /// # async fn main() {
562 /// let lock = Arc::new(RwLock::new(1));
563 /// let c_lock = lock.clone();
564 ///
565 /// let n = lock.read_owned().await;
566 /// assert_eq!(*n, 1);
567 ///
568 /// tokio::spawn(async move {
569 /// // While main has an active read lock, we acquire one too.
570 /// let r = c_lock.read_owned().await;
571 /// assert_eq!(*r, 1);
572 /// }).await.expect("The spawned task has panicked");
573 ///
574 /// // Drop the guard after the spawned task finishes.
575 /// drop(n);
576 ///}
577 /// ```
578 pub async fn read_owned(self: Arc<Self>) -> OwnedRwLockReadGuard<T> {
579 #[cfg(all(tokio_unstable, feature = "tracing"))]
580 let resource_span = self.resource_span.clone();
581
582 let acquire_fut = async {
583 self.s.acquire(1).await.unwrap_or_else(|_| {
584 // The semaphore was closed. but, we never explicitly close it, and we have a
585 // handle to it through the Arc, which means that this can never happen.
586 unreachable!()
587 });
588
589 OwnedRwLockReadGuard {
590 #[cfg(all(tokio_unstable, feature = "tracing"))]
591 resource_span: self.resource_span.clone(),
592 data: self.c.get(),
593 lock: self,
594 _p: PhantomData,
595 }
596 };
597
598 #[cfg(all(tokio_unstable, feature = "tracing"))]
599 let acquire_fut = trace::async_op(
600 move || acquire_fut,
601 resource_span,
602 "RwLock::read_owned",
603 "poll",
604 false,
605 );
606
607 #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
608 let guard = acquire_fut.await;
609
610 #[cfg(all(tokio_unstable, feature = "tracing"))]
611 guard.resource_span.in_scope(|| {
612 tracing::trace!(
613 target: "runtime::resource::state_update",
614 current_readers = 1,
615 current_readers.op = "add",
616 )
617 });
618
619 guard
620 }
621
622 /// Attempts to acquire this `RwLock` with shared read access.
623 ///
624 /// If the access couldn't be acquired immediately, returns [`TryLockError`].
625 /// Otherwise, an RAII guard is returned which will release read access
626 /// when dropped.
627 ///
628 /// [`TryLockError`]: TryLockError
629 ///
630 /// # Examples
631 ///
632 /// ```
633 /// use std::sync::Arc;
634 /// use tokio::sync::RwLock;
635 ///
636 /// # #[tokio::main(flavor = "current_thread")]
637 /// # async fn main() {
638 /// let lock = Arc::new(RwLock::new(1));
639 /// let c_lock = lock.clone();
640 ///
641 /// let v = lock.try_read().unwrap();
642 /// assert_eq!(*v, 1);
643 ///
644 /// tokio::spawn(async move {
645 /// // While main has an active read lock, we acquire one too.
646 /// let n = c_lock.read().await;
647 /// assert_eq!(*n, 1);
648 /// }).await.expect("The spawned task has panicked");
649 ///
650 /// // Drop the guard when spawned task finishes.
651 /// drop(v);
652 /// # }
653 /// ```
654 pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>, TryLockError> {
655 match self.s.try_acquire(1) {
656 Ok(permit) => permit,
657 Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
658 Err(TryAcquireError::Closed) => unreachable!(),
659 }
660
661 let guard = RwLockReadGuard {
662 s: &self.s,
663 data: self.c.get(),
664 marker: marker::PhantomData,
665 #[cfg(all(tokio_unstable, feature = "tracing"))]
666 resource_span: self.resource_span.clone(),
667 };
668
669 #[cfg(all(tokio_unstable, feature = "tracing"))]
670 self.resource_span.in_scope(|| {
671 tracing::trace!(
672 target: "runtime::resource::state_update",
673 current_readers = 1,
674 current_readers.op = "add",
675 )
676 });
677
678 Ok(guard)
679 }
680
681 /// Attempts to acquire this `RwLock` with shared read access.
682 ///
683 /// If the access couldn't be acquired immediately, returns [`TryLockError`].
684 /// Otherwise, an RAII guard is returned which will release read access
685 /// when dropped.
686 ///
687 /// This method is identical to [`RwLock::try_read`], except that the
688 /// returned guard references the `RwLock` with an [`Arc`] rather than by
689 /// borrowing it. Therefore, the `RwLock` must be wrapped in an `Arc` to
690 /// call this method, and the guard will live for the `'static` lifetime,
691 /// as it keeps the `RwLock` alive by holding an `Arc`.
692 ///
693 /// [`TryLockError`]: TryLockError
694 ///
695 /// # Examples
696 ///
697 /// ```
698 /// use std::sync::Arc;
699 /// use tokio::sync::RwLock;
700 ///
701 /// # #[tokio::main(flavor = "current_thread")]
702 /// # async fn main() {
703 /// let lock = Arc::new(RwLock::new(1));
704 /// let c_lock = lock.clone();
705 ///
706 /// let v = lock.try_read_owned().unwrap();
707 /// assert_eq!(*v, 1);
708 ///
709 /// tokio::spawn(async move {
710 /// // While main has an active read lock, we acquire one too.
711 /// let n = c_lock.read_owned().await;
712 /// assert_eq!(*n, 1);
713 /// }).await.expect("The spawned task has panicked");
714 ///
715 /// // Drop the guard when spawned task finishes.
716 /// drop(v);
717 /// # }
718 /// ```
719 pub fn try_read_owned(self: Arc<Self>) -> Result<OwnedRwLockReadGuard<T>, TryLockError> {
720 match self.s.try_acquire(1) {
721 Ok(permit) => permit,
722 Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
723 Err(TryAcquireError::Closed) => unreachable!(),
724 }
725
726 let guard = OwnedRwLockReadGuard {
727 #[cfg(all(tokio_unstable, feature = "tracing"))]
728 resource_span: self.resource_span.clone(),
729 data: self.c.get(),
730 lock: self,
731 _p: PhantomData,
732 };
733
734 #[cfg(all(tokio_unstable, feature = "tracing"))]
735 guard.resource_span.in_scope(|| {
736 tracing::trace!(
737 target: "runtime::resource::state_update",
738 current_readers = 1,
739 current_readers.op = "add",
740 )
741 });
742
743 Ok(guard)
744 }
745
746 /// Locks this `RwLock` with exclusive write access, causing the current
747 /// task to yield until the lock has been acquired.
748 ///
749 /// The calling task will yield while other writers or readers currently
750 /// have access to the lock.
751 ///
752 /// Returns an RAII guard which will drop the write access of this `RwLock`
753 /// when dropped.
754 ///
755 /// # Cancel safety
756 ///
757 /// This method uses a queue to fairly distribute locks in the order they
758 /// were requested. Cancelling a call to `write` makes you lose your place
759 /// in the queue.
760 ///
761 /// # Examples
762 ///
763 /// ```
764 /// use tokio::sync::RwLock;
765 ///
766 /// # #[tokio::main(flavor = "current_thread")]
767 /// # async fn main() {
768 /// let lock = RwLock::new(1);
769 ///
770 /// let mut n = lock.write().await;
771 /// *n = 2;
772 /// # }
773 /// ```
774 pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
775 let acquire_fut = async {
776 self.s.acquire(self.mr as usize).await.unwrap_or_else(|_| {
777 // The semaphore was closed. but, we never explicitly close it, and we have a
778 // handle to it through the Arc, which means that this can never happen.
779 unreachable!()
780 });
781
782 RwLockWriteGuard {
783 permits_acquired: self.mr,
784 s: &self.s,
785 data: self.c.get(),
786 marker: marker::PhantomData,
787 #[cfg(all(tokio_unstable, feature = "tracing"))]
788 resource_span: self.resource_span.clone(),
789 }
790 };
791
792 #[cfg(all(tokio_unstable, feature = "tracing"))]
793 let acquire_fut = trace::async_op(
794 move || acquire_fut,
795 self.resource_span.clone(),
796 "RwLock::write",
797 "poll",
798 false,
799 );
800
801 #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
802 let guard = acquire_fut.await;
803
804 #[cfg(all(tokio_unstable, feature = "tracing"))]
805 self.resource_span.in_scope(|| {
806 tracing::trace!(
807 target: "runtime::resource::state_update",
808 write_locked = true,
809 write_locked.op = "override",
810 )
811 });
812
813 guard
814 }
815
816 /// Blockingly locks this `RwLock` with exclusive write access.
817 ///
818 /// This method is intended for use cases where you
819 /// need to use this rwlock in asynchronous code as well as in synchronous code.
820 ///
821 /// Returns an RAII guard which will drop the write access of this `RwLock` when dropped.
822 ///
823 /// # Panics
824 ///
825 /// This function panics if called within an asynchronous execution context.
826 ///
827 /// - If you find yourself in an asynchronous execution context and needing
828 /// to call some (synchronous) function which performs one of these
829 /// `blocking_` operations, then consider wrapping that call inside
830 /// [`spawn_blocking()`][crate::runtime::Handle::spawn_blocking]
831 /// (or [`block_in_place()`][crate::task::block_in_place]).
832 ///
833 /// # Examples
834 ///
835 /// ```
836 /// # #[cfg(not(target_family = "wasm"))]
837 /// # {
838 /// use std::sync::Arc;
839 /// use tokio::{sync::RwLock};
840 ///
841 /// #[tokio::main]
842 /// async fn main() {
843 /// let rwlock = Arc::new(RwLock::new(1));
844 /// let read_lock = rwlock.read().await;
845 ///
846 /// let blocking_task = tokio::task::spawn_blocking({
847 /// let rwlock = Arc::clone(&rwlock);
848 /// move || {
849 /// // This shall block until the `read_lock` is released.
850 /// let mut write_lock = rwlock.blocking_write();
851 /// *write_lock = 2;
852 /// }
853 /// });
854 ///
855 /// assert_eq!(*read_lock, 1);
856 /// // Release the last outstanding read lock.
857 /// drop(read_lock);
858 ///
859 /// // Await the completion of the blocking task.
860 /// blocking_task.await.unwrap();
861 ///
862 /// // Assert uncontended.
863 /// let read_lock = rwlock.try_read().unwrap();
864 /// assert_eq!(*read_lock, 2);
865 /// }
866 /// # }
867 /// ```
868 #[track_caller]
869 #[cfg(feature = "sync")]
870 pub fn blocking_write(&self) -> RwLockWriteGuard<'_, T> {
871 crate::future::block_on(self.write())
872 }
873
874 /// Locks this `RwLock` with exclusive write access, causing the current
875 /// task to yield until the lock has been acquired.
876 ///
877 /// The calling task will yield while other writers or readers currently
878 /// have access to the lock.
879 ///
880 /// This method is identical to [`RwLock::write`], except that the returned
881 /// guard references the `RwLock` with an [`Arc`] rather than by borrowing
882 /// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
883 /// method, and the guard will live for the `'static` lifetime, as it keeps
884 /// the `RwLock` alive by holding an `Arc`.
885 ///
886 /// Returns an RAII guard which will drop the write access of this `RwLock`
887 /// when dropped.
888 ///
889 /// # Cancel safety
890 ///
891 /// This method uses a queue to fairly distribute locks in the order they
892 /// were requested. Cancelling a call to `write_owned` makes you lose your
893 /// place in the queue.
894 ///
895 /// # Examples
896 ///
897 /// ```
898 /// use std::sync::Arc;
899 /// use tokio::sync::RwLock;
900 ///
901 /// # #[tokio::main(flavor = "current_thread")]
902 /// # async fn main() {
903 /// let lock = Arc::new(RwLock::new(1));
904 ///
905 /// let mut n = lock.write_owned().await;
906 /// *n = 2;
907 ///}
908 /// ```
909 pub async fn write_owned(self: Arc<Self>) -> OwnedRwLockWriteGuard<T> {
910 #[cfg(all(tokio_unstable, feature = "tracing"))]
911 let resource_span = self.resource_span.clone();
912
913 let acquire_fut = async {
914 self.s.acquire(self.mr as usize).await.unwrap_or_else(|_| {
915 // The semaphore was closed. but, we never explicitly close it, and we have a
916 // handle to it through the Arc, which means that this can never happen.
917 unreachable!()
918 });
919
920 OwnedRwLockWriteGuard {
921 #[cfg(all(tokio_unstable, feature = "tracing"))]
922 resource_span: self.resource_span.clone(),
923 permits_acquired: self.mr,
924 data: self.c.get(),
925 lock: self,
926 _p: PhantomData,
927 }
928 };
929
930 #[cfg(all(tokio_unstable, feature = "tracing"))]
931 let acquire_fut = trace::async_op(
932 move || acquire_fut,
933 resource_span,
934 "RwLock::write_owned",
935 "poll",
936 false,
937 );
938
939 #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
940 let guard = acquire_fut.await;
941
942 #[cfg(all(tokio_unstable, feature = "tracing"))]
943 guard.resource_span.in_scope(|| {
944 tracing::trace!(
945 target: "runtime::resource::state_update",
946 write_locked = true,
947 write_locked.op = "override",
948 )
949 });
950
951 guard
952 }
953
954 /// Attempts to acquire this `RwLock` with exclusive write access.
955 ///
956 /// If the access couldn't be acquired immediately, returns [`TryLockError`].
957 /// Otherwise, an RAII guard is returned which will release write access
958 /// when dropped.
959 ///
960 /// [`TryLockError`]: TryLockError
961 ///
962 /// # Examples
963 ///
964 /// ```
965 /// use tokio::sync::RwLock;
966 ///
967 /// # #[tokio::main(flavor = "current_thread")]
968 /// # async fn main() {
969 /// let rw = RwLock::new(1);
970 ///
971 /// let v = rw.read().await;
972 /// assert_eq!(*v, 1);
973 ///
974 /// assert!(rw.try_write().is_err());
975 /// # }
976 /// ```
977 pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, TryLockError> {
978 match self.s.try_acquire(self.mr as usize) {
979 Ok(permit) => permit,
980 Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
981 Err(TryAcquireError::Closed) => unreachable!(),
982 }
983
984 let guard = RwLockWriteGuard {
985 permits_acquired: self.mr,
986 s: &self.s,
987 data: self.c.get(),
988 marker: marker::PhantomData,
989 #[cfg(all(tokio_unstable, feature = "tracing"))]
990 resource_span: self.resource_span.clone(),
991 };
992
993 #[cfg(all(tokio_unstable, feature = "tracing"))]
994 self.resource_span.in_scope(|| {
995 tracing::trace!(
996 target: "runtime::resource::state_update",
997 write_locked = true,
998 write_locked.op = "override",
999 )
1000 });
1001
1002 Ok(guard)
1003 }
1004
1005 /// Attempts to acquire this `RwLock` with exclusive write access.
1006 ///
1007 /// If the access couldn't be acquired immediately, returns [`TryLockError`].
1008 /// Otherwise, an RAII guard is returned which will release write access
1009 /// when dropped.
1010 ///
1011 /// This method is identical to [`RwLock::try_write`], except that the
1012 /// returned guard references the `RwLock` with an [`Arc`] rather than by
1013 /// borrowing it. Therefore, the `RwLock` must be wrapped in an `Arc` to
1014 /// call this method, and the guard will live for the `'static` lifetime,
1015 /// as it keeps the `RwLock` alive by holding an `Arc`.
1016 ///
1017 /// [`TryLockError`]: TryLockError
1018 ///
1019 /// # Examples
1020 ///
1021 /// ```
1022 /// use std::sync::Arc;
1023 /// use tokio::sync::RwLock;
1024 ///
1025 /// # #[tokio::main(flavor = "current_thread")]
1026 /// # async fn main() {
1027 /// let rw = Arc::new(RwLock::new(1));
1028 ///
1029 /// let v = Arc::clone(&rw).read_owned().await;
1030 /// assert_eq!(*v, 1);
1031 ///
1032 /// assert!(rw.try_write_owned().is_err());
1033 /// # }
1034 /// ```
1035 pub fn try_write_owned(self: Arc<Self>) -> Result<OwnedRwLockWriteGuard<T>, TryLockError> {
1036 match self.s.try_acquire(self.mr as usize) {
1037 Ok(permit) => permit,
1038 Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
1039 Err(TryAcquireError::Closed) => unreachable!(),
1040 }
1041
1042 let guard = OwnedRwLockWriteGuard {
1043 #[cfg(all(tokio_unstable, feature = "tracing"))]
1044 resource_span: self.resource_span.clone(),
1045 permits_acquired: self.mr,
1046 data: self.c.get(),
1047 lock: self,
1048 _p: PhantomData,
1049 };
1050
1051 #[cfg(all(tokio_unstable, feature = "tracing"))]
1052 guard.resource_span.in_scope(|| {
1053 tracing::trace!(
1054 target: "runtime::resource::state_update",
1055 write_locked = true,
1056 write_locked.op = "override",
1057 )
1058 });
1059
1060 Ok(guard)
1061 }
1062
1063 /// Returns a mutable reference to the underlying data.
1064 ///
1065 /// Since this call borrows the `RwLock` mutably, no actual locking needs to
1066 /// take place -- the mutable borrow statically guarantees no locks exist.
1067 ///
1068 /// # Examples
1069 ///
1070 /// ```
1071 /// use tokio::sync::RwLock;
1072 ///
1073 /// fn main() {
1074 /// let mut lock = RwLock::new(1);
1075 ///
1076 /// let n = lock.get_mut();
1077 /// *n = 2;
1078 /// }
1079 /// ```
1080 pub fn get_mut(&mut self) -> &mut T {
1081 self.c.get_mut()
1082 }
1083
1084 /// Consumes the lock, returning the underlying data.
1085 pub fn into_inner(self) -> T
1086 where
1087 T: Sized,
1088 {
1089 self.c.into_inner()
1090 }
1091}
1092
1093impl<T> From<T> for RwLock<T> {
1094 fn from(s: T) -> Self {
1095 Self::new(s)
1096 }
1097}
1098
1099impl<T> Default for RwLock<T>
1100where
1101 T: Default,
1102{
1103 fn default() -> Self {
1104 Self::new(T::default())
1105 }
1106}
1107
1108impl<T: ?Sized> std::fmt::Debug for RwLock<T>
1109where
1110 T: std::fmt::Debug,
1111{
1112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1113 let mut d = f.debug_struct("RwLock");
1114 match self.try_read() {
1115 Ok(inner) => d.field("data", &&*inner),
1116 Err(_) => d.field("data", &format_args!("<locked>")),
1117 };
1118 d.finish()
1119 }
1120}