Skip to content

Commit 7ed2737

Browse files
authored
sync: Add has_changed method to watch::Ref (#4758)
1 parent 79d8024 commit 7ed2737

File tree

1 file changed

+75
-8
lines changed

1 file changed

+75
-8
lines changed

tokio/src/sync/watch.rs

Lines changed: 75 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,55 @@ pub struct Sender<T> {
114114
#[derive(Debug)]
115115
pub struct Ref<'a, T> {
116116
inner: RwLockReadGuard<'a, T>,
117+
has_changed: bool,
118+
}
119+
120+
impl<'a, T> Ref<'a, T> {
121+
/// Indicates if the borrowed value is considered as _changed_ since the last
122+
/// time it has been marked as seen.
123+
///
124+
/// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
125+
///
126+
/// When borrowed from the [`Sender`] this function will always return `false`.
127+
///
128+
/// # Examples
129+
///
130+
/// ```
131+
/// use tokio::sync::watch;
132+
///
133+
/// #[tokio::main]
134+
/// async fn main() {
135+
/// let (tx, mut rx) = watch::channel("hello");
136+
///
137+
/// tx.send("goodbye").unwrap();
138+
/// // The sender does never consider the value as changed.
139+
/// assert!(!tx.borrow().has_changed());
140+
///
141+
/// // Drop the sender immediately, just for testing purposes.
142+
/// drop(tx);
143+
///
144+
/// // Even if the sender has already been dropped...
145+
/// assert!(rx.has_changed().is_err());
146+
/// // ...the modified value is still readable and detected as changed.
147+
/// assert_eq!(*rx.borrow(), "goodbye");
148+
/// assert!(rx.borrow().has_changed());
149+
///
150+
/// // Read the changed value and mark it as seen.
151+
/// {
152+
/// let received = rx.borrow_and_update();
153+
/// assert_eq!(*received, "goodbye");
154+
/// assert!(received.has_changed());
155+
/// // Release the read lock when leaving this scope.
156+
/// }
157+
///
158+
/// // Now the value has already been marked as seen and could
159+
/// // never be modified again (after the sender has been dropped).
160+
/// assert!(!rx.borrow().has_changed());
161+
/// }
162+
/// ```
163+
pub fn has_changed(&self) -> bool {
164+
self.has_changed
165+
}
117166
}
118167

119168
#[derive(Debug)]
@@ -336,15 +385,21 @@ impl<T> Receiver<T> {
336385
/// ```
337386
pub fn borrow(&self) -> Ref<'_, T> {
338387
let inner = self.shared.value.read().unwrap();
339-
Ref { inner }
388+
389+
// After obtaining a read-lock no concurrent writes could occur
390+
// and the loaded version matches that of the borrowed reference.
391+
let new_version = self.shared.state.load().version();
392+
let has_changed = self.version != new_version;
393+
394+
Ref { inner, has_changed }
340395
}
341396

342-
/// Returns a reference to the most recently sent value and mark that value
397+
/// Returns a reference to the most recently sent value and marks that value
343398
/// as seen.
344399
///
345-
/// This method marks the value as seen, so [`changed`] will not return
346-
/// immediately if the newest value is one previously returned by
347-
/// `borrow_and_update`.
400+
/// This method marks the current value as seen. Subsequent calls to [`changed`]
401+
/// will not return immediately until the [`Sender`] has modified the shared
402+
/// value again.
348403
///
349404
/// Outstanding borrows hold a read lock. This means that long lived borrows
350405
/// could cause the send half to block. It is recommended to keep the borrow
@@ -372,8 +427,16 @@ impl<T> Receiver<T> {
372427
/// [`changed`]: Receiver::changed
373428
pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
374429
let inner = self.shared.value.read().unwrap();
375-
self.version = self.shared.state.load().version();
376-
Ref { inner }
430+
431+
// After obtaining a read-lock no concurrent writes could occur
432+
// and the loaded version matches that of the borrowed reference.
433+
let new_version = self.shared.state.load().version();
434+
let has_changed = self.version != new_version;
435+
436+
// Mark the shared value as seen by updating the version
437+
self.version = new_version;
438+
439+
Ref { inner, has_changed }
377440
}
378441

379442
/// Checks if this channel contains a message that this receiver has not yet
@@ -731,7 +794,11 @@ impl<T> Sender<T> {
731794
/// ```
732795
pub fn borrow(&self) -> Ref<'_, T> {
733796
let inner = self.shared.value.read().unwrap();
734-
Ref { inner }
797+
798+
// The sender/producer always sees the current version
799+
let has_changed = false;
800+
801+
Ref { inner, has_changed }
735802
}
736803

737804
/// Checks if the channel has been closed. This happens when all receivers

0 commit comments

Comments
 (0)