-
-
Notifications
You must be signed in to change notification settings - Fork 104
ref: make Context::alloc_ongoing return a guard #4248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 1 commit
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
0c5d183
ref: make Context::alloc_ongoing return a guard
flub 201d05d
Remove Context::free_ongoing function
flub 32629b9
Merge branch 'master' into flub/ongoing-guard
flub 61b00f9
typo
flub b7edd4e
Merge branch 'master' into flub/ongoing-guard
flub 4aa248d
Merge branch 'master' into flub/ongoing-guard
flub b9fd529
Re-add info message using elapsed stopping time
flub File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,16 +2,19 @@ | |
|
||
use std::collections::{BTreeMap, HashMap}; | ||
use std::ffi::OsString; | ||
use std::future::Future; | ||
use std::ops::Deref; | ||
use std::path::{Path, PathBuf}; | ||
use std::pin::Pin; | ||
use std::sync::atomic::AtomicBool; | ||
use std::sync::Arc; | ||
use std::task::Poll; | ||
use std::time::{Duration, Instant, SystemTime}; | ||
|
||
use anyhow::{bail, ensure, Context as _, Result}; | ||
use async_channel::{self as channel, Receiver, Sender}; | ||
use async_channel::Sender; | ||
use ratelimit::Ratelimit; | ||
use tokio::sync::{Mutex, RwLock}; | ||
use tokio::sync::{oneshot, Mutex, RwLock}; | ||
use tokio::task; | ||
|
||
use crate::chat::{get_chat_cnt, ChatId}; | ||
|
@@ -257,7 +260,7 @@ pub(crate) struct DebugLogging { | |
#[derive(Debug)] | ||
enum RunningState { | ||
/// Ongoing process is allocated. | ||
Running { cancel_sender: Sender<()> }, | ||
Running { cancel_sender: oneshot::Sender<()> }, | ||
|
||
/// Cancel signal has been sent, waiting for ongoing process to be freed. | ||
ShallStop, | ||
|
@@ -511,21 +514,35 @@ impl Context { | |
/// This is for modal operations during which no other user actions are allowed. Only | ||
/// one such operation is allowed at any given time. | ||
/// | ||
/// The return value is a cancel token, which will release the ongoing mutex when | ||
/// dropped. | ||
pub(crate) async fn alloc_ongoing(&self) -> Result<Receiver<()>> { | ||
/// The return value is a guard which does two things: | ||
/// | ||
/// - It is a Future which will complete when the ongoing process is cancelled using | ||
/// [`Context::stop_ongoing`] and must stop. | ||
/// - It will free the ongoing process, aka release the mutex, when dropped. | ||
pub(crate) async fn alloc_ongoing(&self) -> Result<OngoingGuard> { | ||
let mut s = self.running_state.write().await; | ||
ensure!( | ||
matches!(*s, RunningState::Stopped), | ||
"There is already another ongoing process running." | ||
); | ||
|
||
let (sender, receiver) = channel::bounded(1); | ||
let (cancel_tx, cancel_rx) = oneshot::channel(); | ||
*s = RunningState::Running { | ||
cancel_sender: sender, | ||
cancel_sender: cancel_tx, | ||
}; | ||
let (drop_tx, drop_rx) = oneshot::channel(); | ||
let context = self.clone(); | ||
|
||
tokio::spawn(async move { | ||
drop_rx.await.ok(); | ||
let mut s = context.running_state.write().await; | ||
*s = RunningState::Stopped; | ||
}); | ||
|
||
Ok(receiver) | ||
Ok(OngoingGuard { | ||
cancel_rx, | ||
drop_tx: Some(drop_tx), | ||
}) | ||
} | ||
|
||
pub(crate) async fn free_ongoing(&self) { | ||
|
@@ -536,21 +553,24 @@ impl Context { | |
/// Signal an ongoing process to stop. | ||
pub async fn stop_ongoing(&self) { | ||
let mut s = self.running_state.write().await; | ||
match &*s { | ||
RunningState::Running { cancel_sender } => { | ||
if let Err(err) = cancel_sender.send(()).await { | ||
warn!(self, "could not cancel ongoing: {:#}", err); | ||
} | ||
info!(self, "Signaling the ongoing process to stop ASAP.",); | ||
*s = RunningState::ShallStop; | ||
} | ||
|
||
// Take out the state so we can call the oneshot sender (which takes ownership). | ||
let current_state = std::mem::replace(&mut *s, RunningState::ShallStop); | ||
|
||
match current_state { | ||
RunningState::Running { cancel_sender } => match cancel_sender.send(()) { | ||
Ok(()) => info!(self, "Signaling the ongoing process to stop ASAP."), | ||
Err(()) => warn!(self, "could not cancel ongoing"), | ||
}, | ||
RunningState::ShallStop | RunningState::Stopped => { | ||
// Put back the current state | ||
*s = current_state; | ||
info!(self, "No ongoing process to stop.",); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this log correct for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kind off, yes. The ongoing process is already requested to stop so there's no ongoing process to stop. 🤷 |
||
} | ||
} | ||
} | ||
|
||
#[allow(unused)] | ||
#[cfg(test)] | ||
pub(crate) async fn shall_stop_ongoing(&self) -> bool { | ||
match &*self.running_state.read().await { | ||
RunningState::Running { .. } => false, | ||
|
@@ -945,6 +965,54 @@ impl Context { | |
} | ||
} | ||
|
||
/// Guard received when calling [`Context::alloc_ongoing`]. | ||
/// | ||
/// While holding this guard the ongoing mutex is held, dropping this guard frees the | ||
/// ongoing process. | ||
/// | ||
/// The ongoing process can also be cancelled by unrelated code calling | ||
/// [`Context::stop_ongoing`]. This guard implements [`Future`] and the future will | ||
/// complete when the ongoing process is cancelled and must be aborted. Freeing the ongoing | ||
/// process works as usual in this case: when this guard is dropped. So if you need to do | ||
/// some more work before freeing make sure to keep ownership of the guard, e.g.: | ||
/// | ||
/// ```no_compile | ||
/// let mut guard = context.alloc_ongoing().await?; | ||
/// tokio::select!{ | ||
/// biased; | ||
/// _ = &mut guard => (), // guard is not moved, so we keep ownership. | ||
/// _ = do_work() => (), | ||
/// }; | ||
/// do_cleaup().await; | ||
/// drop(guard); | ||
/// ``` | ||
pub(crate) struct OngoingGuard { | ||
/// Receives a message when the ongoing process should be cancelled. | ||
cancel_rx: oneshot::Receiver<()>, | ||
/// Used by `Drop` to send a message which will free the ongoing process. | ||
drop_tx: Option<oneshot::Sender<()>>, | ||
} | ||
|
||
impl Future for OngoingGuard { | ||
type Output = (); | ||
|
||
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { | ||
match Pin::new(&mut self.cancel_rx).poll(cx) { | ||
Poll::Ready(_) => Poll::Ready(()), | ||
Poll::Pending => Poll::Pending, | ||
} | ||
} | ||
} | ||
|
||
impl Drop for OngoingGuard { | ||
fn drop(&mut self) { | ||
if let Some(sender) = self.drop_tx.take() { | ||
// TODO: Maybe this should log? But we'd need to have a context. | ||
sender.send(()).ok(); | ||
} | ||
} | ||
} | ||
|
||
/// Returns core version as a string. | ||
pub fn get_version_str() -> &'static str { | ||
&DC_VERSION_STR | ||
|
@@ -1409,38 +1477,52 @@ mod tests { | |
async fn test_ongoing() -> Result<()> { | ||
let context = TestContext::new().await; | ||
|
||
// No ongoing process allocated. | ||
println!("No ongoing process allocated."); | ||
assert!(context.shall_stop_ongoing().await); | ||
|
||
let receiver = context.alloc_ongoing().await?; | ||
let mut guard = context.alloc_ongoing().await?; | ||
|
||
// Cannot allocate another ongoing process while the first one is running. | ||
println!("Cannot allocate another ongoing process while the first one is running."); | ||
assert!(context.alloc_ongoing().await.is_err()); | ||
|
||
// Stop signal is not sent yet. | ||
assert!(receiver.try_recv().is_err()); | ||
println!("Stop signal is not sent yet."); | ||
assert!(matches!(futures::poll!(&mut guard), Poll::Pending)); | ||
|
||
assert!(!context.shall_stop_ongoing().await); | ||
|
||
// Send the stop signal. | ||
println!("Send the stop signal."); | ||
context.stop_ongoing().await; | ||
|
||
// Receive stop signal. | ||
receiver.recv().await?; | ||
println!("Receive stop signal."); | ||
(&mut guard).await; | ||
|
||
assert!(context.shall_stop_ongoing().await); | ||
|
||
// Ongoing process is still running even though stop signal was received, | ||
// so another one cannot be allocated. | ||
println!("Ongoing process still running even though stop signal was received"); | ||
assert!(context.alloc_ongoing().await.is_err()); | ||
|
||
context.free_ongoing().await; | ||
|
||
// No ongoing process allocated, should have been stopped already. | ||
assert!(context.shall_stop_ongoing().await); | ||
|
||
// Another ongoing process can be allocated now. | ||
let _receiver = context.alloc_ongoing().await?; | ||
println!("free the ongoing process"); | ||
// context.free_ongoing().await; | ||
drop(guard); | ||
|
||
println!("re-acquire the ongoing process"); | ||
// Since the drop guard needs to send a message and the receiving task must run and | ||
// acquire a lock this needs some time so won't succeed immediately. | ||
#[allow(clippy::async_yields_async)] | ||
let _guard = tokio::time::timeout(Duration::from_secs(10), async { | ||
loop { | ||
match context.alloc_ongoing().await { | ||
Ok(guard) => break guard, | ||
Err(_) => { | ||
// tokio::task::yield_now() results in a lot hotter loop, it takes a | ||
// lot of yields. | ||
tokio::time::sleep(Duration::from_millis(1)).await; | ||
} | ||
} | ||
} | ||
}) | ||
.await | ||
.expect("timeout"); | ||
|
||
Ok(()) | ||
} | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe cancel_ongoing()? Because it doesn't actually wait for the ongoing process to stop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree that
cancel_ongoing()
would be a nicer name. I guess I could change that as well. I didn't touch it because it is a pub API though and I feel like if we try and rename it maybe it should also be renamed in the FFI and JSON-RPC APIs and now we're breaking all clients. That seemed like a bit too much work for a slightly unfortunate name.