Skip to content

Commit 8155e02

Browse files
authored
make capture::source's token thread-safe (#364)
* make capture::source's token thread-safe * clean up capability dropping more * break tuple out into variables
1 parent 27492f2 commit 8155e02

File tree

1 file changed

+74
-55
lines changed

1 file changed

+74
-55
lines changed

src/capture.rs

Lines changed: 74 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,24 @@ pub mod source {
265265
use std::cell::RefCell;
266266
use std::hash::Hash;
267267
use std::rc::Rc;
268+
use std::marker::{Send, Sync};
269+
use std::sync::Arc;
268270
use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}};
269271
use timely::progress::Timestamp;
270-
use timely::scheduling::SyncActivator;
272+
use timely::scheduling::{SyncActivator, activate::SyncActivateOnDrop};
273+
274+
// TODO(guswynn): implement this generally in timely
275+
struct DropActivator {
276+
activator: Arc<SyncActivator>,
277+
}
278+
279+
impl Drop for DropActivator {
280+
fn drop(&mut self) {
281+
// Best effort: failure to activate
282+
// is ignored
283+
let _ = self.activator.activate();
284+
}
285+
}
271286

272287
/// Constructs a stream of updates from a source of messages.
273288
///
@@ -277,7 +292,7 @@ pub mod source {
277292
pub fn build<G, B, I, D, T, R>(
278293
scope: G,
279294
source_builder: B,
280-
) -> (Box<dyn std::any::Any>, Stream<G, (D, T, R)>)
295+
) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
281296
where
282297
G: Scope<Timestamp = T>,
283298
B: FnOnce(SyncActivator) -> I,
@@ -324,8 +339,7 @@ pub mod source {
324339
// Some message distribution logic depends on the number of workers.
325340
let workers = scope.peers();
326341

327-
// Vector of strong references to capabilities, which can be dropped to terminate the sources.
328-
let mut tokens = Vec::new();
342+
let mut token = None;
329343
// Frontier owned by the FEEDBACK operator and consulted by the MESSAGES operators.
330344
let mut antichain = MutableAntichain::new();
331345
antichain.update_iter(Some((T::minimum(), workers as i64)));
@@ -337,66 +351,71 @@ pub mod source {
337351
let address = messages_op.operator_info().address;
338352
let activator = scope.sync_activator_for(&address);
339353
let activator2 = scope.activator_for(&address);
340-
let activations = scope.activations();
354+
let drop_activator = Arc::new(SyncActivateOnDrop::new((), scope.sync_activator_for(&address)));
341355
let mut source = source_builder(activator);
342356
let (mut updates_out, updates) = messages_op.new_output();
343357
let (mut progress_out, progress) = messages_op.new_output();
344-
let tokens_mut = &mut tokens;
345-
messages_op.build(move |capabilities| {
358+
messages_op.build(|capabilities| {
359+
360+
// A Weak that communicates whether the returned token has been dropped.
361+
let drop_activator_weak = Arc::downgrade(&drop_activator);
362+
363+
token = Some(drop_activator);
364+
346365
// Read messages from some source; shuffle them to UPDATES and PROGRESS; share capability with FEEDBACK.
347-
// First, wrap capabilities in a rc refcell so that they can be downgraded to weak references.
348-
use timely::scheduling::activate::ActivateOnDrop;
349-
let capability_sets = (CapabilitySet::from_elem(capabilities[0].clone()), CapabilitySet::from_elem(capabilities[1].clone()));
350-
let capability_sets = ActivateOnDrop::new(capability_sets, Rc::new(address), activations);
351-
let strong_capabilities = Rc::new(RefCell::new(capability_sets));
352-
let local_capabilities = Rc::downgrade(&strong_capabilities);
353-
tokens_mut.push(strong_capabilities);
366+
let mut updates_caps = CapabilitySet::from_elem(capabilities[0].clone());
367+
let mut progress_caps = CapabilitySet::from_elem(capabilities[1].clone());
354368
// Capture the shared frontier to read out frontier updates to apply.
355369
let local_frontier = shared_frontier.clone();
356370
//
357371
move |_frontiers| {
358372
// First check to ensure that we haven't been terminated by someone dropping our tokens.
359-
if let Some(capabilities) = local_capabilities.upgrade() {
360-
let (updates_caps, progress_caps) = &mut **capabilities.borrow_mut();
361-
// Consult our shared frontier, and ensure capabilities are downgraded to it.
362-
let shared_frontier = local_frontier.borrow();
363-
updates_caps.downgrade(&shared_frontier.frontier());
364-
progress_caps.downgrade(&shared_frontier.frontier());
365-
366-
// Next check to see if we have been terminated by the source being complete.
367-
if !updates_caps.is_empty() && !progress_caps.is_empty() {
368-
let mut updates = updates_out.activate();
369-
let mut progress = progress_out.activate();
370-
371-
// TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
372-
// Specifically, there may not be one capability valid for all updates.
373-
let mut updates_session = updates.session(&updates_caps[0]);
374-
let mut progress_session = progress.session(&progress_caps[0]);
375-
376-
// We presume the iterator will yield if appropriate.
377-
while let Some(message) = source.next() {
378-
match message {
379-
Message::Updates(mut updates) => {
380-
updates_session.give_vec(&mut updates);
373+
if drop_activator_weak.upgrade().is_none() {
374+
// Give up our capabilities
375+
updates_caps.downgrade(&[]);
376+
progress_caps.downgrade(&[]);
377+
// never continue, even if we are (erroneously) activated again.
378+
return;
379+
}
380+
381+
// Consult our shared frontier, and ensure capabilities are downgraded to it.
382+
let shared_frontier = local_frontier.borrow();
383+
updates_caps.downgrade(&shared_frontier.frontier());
384+
progress_caps.downgrade(&shared_frontier.frontier());
385+
386+
// Next check to see if we have been terminated by the source being complete.
387+
if !updates_caps.is_empty() && !progress_caps.is_empty() {
388+
let mut updates = updates_out.activate();
389+
let mut progress = progress_out.activate();
390+
391+
// TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
392+
// Specifically, there may not be one capability valid for all updates.
393+
let mut updates_session = updates.session(&updates_caps[0]);
394+
let mut progress_session = progress.session(&progress_caps[0]);
395+
396+
// We presume the iterator will yield if appropriate.
397+
while let Some(message) = source.next() {
398+
match message {
399+
Message::Updates(mut updates) => {
400+
updates_session.give_vec(&mut updates);
401+
}
402+
Message::Progress(progress) => {
403+
// We must send a copy of each progress message to all workers,
404+
// but we can partition the counts across workers by timestamp.
405+
let mut to_worker = vec![Vec::new(); workers];
406+
for (time, count) in progress.counts {
407+
to_worker[(time.hashed() as usize) % workers]
408+
.push((time, count));
381409
}
382-
Message::Progress(progress) => {
383-
// We must send a copy of each progress message to all workers,
384-
// but we can partition the counts across workers by timestamp.
385-
let mut to_worker = vec![Vec::new(); workers];
386-
for (time, count) in progress.counts {
387-
to_worker[(time.hashed() as usize) % workers]
388-
.push((time, count));
389-
}
390-
for (worker, counts) in to_worker.into_iter().enumerate() {
391-
progress_session.give((
392-
worker,
393-
Progress {
394-
lower: progress.lower.clone(),
395-
upper: progress.upper.clone(),
396-
counts,
397-
},
398-
));
399-
}
410+
for (worker, counts) in to_worker.into_iter().enumerate() {
411+
progress_session.give((
412+
worker,
413+
Progress {
414+
lower: progress.lower.clone(),
415+
upper: progress.upper.clone(),
416+
counts,
417+
},
418+
));
400419
}
401420
}
402421
}
@@ -558,7 +577,7 @@ pub mod source {
558577
}
559578
});
560579

561-
(Box::new(tokens), changes)
580+
(Box::new(token.unwrap()), changes)
562581
}
563582
}
564583

0 commit comments

Comments
 (0)