-
Notifications
You must be signed in to change notification settings - Fork 418
Allow async events processing without holding total_consistency_lock
#2199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,7 +72,7 @@ use core::{cmp, mem}; | |
use core::cell::RefCell; | ||
use crate::io::Read; | ||
use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState}; | ||
use core::sync::atomic::{AtomicUsize, Ordering}; | ||
use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; | ||
use core::time::Duration; | ||
use core::ops::Deref; | ||
|
||
|
@@ -926,6 +926,8 @@ where | |
|
||
/// See `ChannelManager` struct-level documentation for lock order requirements. | ||
pending_events: Mutex<Vec<events::Event>>, | ||
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously. | ||
pending_events_processor: AtomicBool, | ||
/// See `ChannelManager` struct-level documentation for lock order requirements. | ||
pending_background_events: Mutex<Vec<BackgroundEvent>>, | ||
/// Used when we have to take a BIG lock to make sure everything is self-consistent. | ||
|
@@ -1680,30 +1682,47 @@ macro_rules! handle_new_monitor_update { | |
|
||
macro_rules! process_events_body { | ||
($self: expr, $event_to_handle: expr, $handle_event: expr) => { | ||
// We'll acquire our total consistency lock until the returned future completes so that | ||
// we can be sure no other persists happen while processing events. | ||
let _read_guard = $self.total_consistency_lock.read().unwrap(); | ||
let mut processed_all_events = false; | ||
while !processed_all_events { | ||
if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { | ||
return; | ||
} | ||
|
||
let mut result = NotifyOption::SkipPersist; | ||
let mut result = NotifyOption::SkipPersist; | ||
|
||
// TODO: This behavior should be documented. It's unintuitive that we query | ||
// ChannelMonitors when clearing other events. | ||
if $self.process_pending_monitor_events() { | ||
result = NotifyOption::DoPersist; | ||
} | ||
{ | ||
// We'll acquire our total consistency lock so that we can be sure no other | ||
// persists happen while processing monitor events. | ||
let _read_guard = $self.total_consistency_lock.read().unwrap(); | ||
|
||
// TODO: This behavior should be documented. It's unintuitive that we query | ||
// ChannelMonitors when clearing other events. | ||
if $self.process_pending_monitor_events() { | ||
result = NotifyOption::DoPersist; | ||
} | ||
} | ||
|
||
let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]); | ||
if !pending_events.is_empty() { | ||
result = NotifyOption::DoPersist; | ||
} | ||
let pending_events = $self.pending_events.lock().unwrap().clone(); | ||
let num_events = pending_events.len(); | ||
if !pending_events.is_empty() { | ||
result = NotifyOption::DoPersist; | ||
} | ||
|
||
for event in pending_events { | ||
$event_to_handle = event; | ||
$handle_event; | ||
} | ||
for event in pending_events { | ||
$event_to_handle = event; | ||
$handle_event; | ||
} | ||
|
||
if result == NotifyOption::DoPersist { | ||
$self.persistence_notifier.notify(); | ||
{ | ||
let mut pending_events = $self.pending_events.lock().unwrap(); | ||
pending_events.drain(..num_events); | ||
processed_all_events = pending_events.is_empty(); | ||
$self.pending_events_processor.store(false, Ordering::Release); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this only happen if !processed_all_events? Not a big deal either way, I think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean if we processed all events? Yeah, I think I'd leave it as is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, no, I mean literally just move the setter here into a check for if we're about to go around again. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, yes, had understood as much, but we def. need to reset in the case we leave the method. We could have moved the |
||
} | ||
|
||
if result == NotifyOption::DoPersist { | ||
$self.persistence_notifier.notify(); | ||
wpaulino marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
} | ||
|
@@ -1771,6 +1790,7 @@ where | |
per_peer_state: FairRwLock::new(HashMap::new()), | ||
|
||
pending_events: Mutex::new(Vec::new()), | ||
pending_events_processor: AtomicBool::new(false), | ||
pending_background_events: Mutex::new(Vec::new()), | ||
total_consistency_lock: RwLock::new(()), | ||
persistence_notifier: Notifier::new(), | ||
|
@@ -7916,6 +7936,7 @@ where | |
per_peer_state: FairRwLock::new(per_peer_state), | ||
|
||
pending_events: Mutex::new(pending_events_read), | ||
pending_events_processor: AtomicBool::new(false), | ||
pending_background_events: Mutex::new(pending_background_events), | ||
total_consistency_lock: RwLock::new(()), | ||
persistence_notifier: Notifier::new(), | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come this is all run in a while loop? IIUC there may be other events added to
pending_events
by other async tasks while handling the events, which is how we end up not having processed all events, but why do we keep processing untilpending_events
is empty as opposed to just processing the events that were present when we first call this function? I guess does it make much of a difference or is it more just that we might as well do it while we're hereThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we no longer allow multiple processors to run at the same time - if one process_events call starts, and makes some progress, then an event is generated, causing a second process_events call to happen, the second call might return early, but there's some events there the user expects to have processed. Thus, we need to make sure the first process_events goes around again and processes the remaining events.