Skip to content

Commit 933b8b1

Browse files
Merge pull request #188 from frankmcsherry/logging_revamp
Logging revamp
2 parents 8187ebb + 8754664 commit 933b8b1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1048
-957
lines changed

CHANGELOG.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Changelog
2+
All notable changes to this project will be documented in this file.
3+
4+
## Unreleased
5+
- Many logging events have been rationalized. Operators and Channels should all have a worker-unique identifier that can be used to connect their metadata with events involving them. Previously this was a bit of a shambles.
6+
7+
## 0.7.0
8+
9+
### Added
10+
- You can now construct your own vector of allocator builders and supply them directly to `timely::execute::execute_from`. Previously one was restricted to whatever a `Configuration` could provide for you. This should allow more pleasant construction of custom allocators, or custom construction of existing allocators.
11+
- Each timely worker now has a log registry, `worker.log_registry()`, from which you can register and acquire typed loggers for named log streams. This supports user-level logging, as well as user-configurable timely logging. Timely logging is under the name `"timely"`.
12+
13+
### Changed
14+
- The `Root` type has been renamed `Worker` and is found in the `::worker` module. The methods of the `ScopeParent` trait are now in the `::worker::AsWorker` trait.
15+
- The communication `Allocate` trait's main method `allocate` now takes a worker-unique identifier to use for the channel. The allocator may or may not use the information (most often for logging), but they are allowed to be incorrect if one allocates two channels with the same identifier.
16+
- A `CapabilityRef<T>` now supports `retain_for(usize)` which indicates a specific output port the capability should be retain for use with. The `retain()` method still exists for now and is equivalent to `retain(0)`. This change also comes with the *inability* to use an arbitrary `Capability<T>` with any output; using a capability bound to the wrong output will result in a run-time error.
17+
- The `unary` and `binary` operators now provide `data` as a `RefOrMut`, which does not implement `DerefMut`. More information on how to port methods can be found [here](https://github.com/frankmcsherry/timely-dataflow/pull/135#issuecomment-418355284).
18+
19+
20+
### Removed
21+
- The deprecated `Unary` and `Binary` operator extension traits have been removed in favor of the `Operator` trait that supports both of them, as well as their `_notify` variants.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ license = "MIT"
1818
[dependencies]
1919
abomonation = { git = "https://github.com/frankmcsherry/abomonation" }
2020
abomonation_derive = "0.3"
21-
bytes = { path = "./bytes" }
21+
timely_bytes = { path = "./bytes" }
22+
timely_logging = { path = "./logging" }
2223
timely_communication = { path = "./communication"}
23-
time="0.1.34"
2424

2525
[dev-dependencies]
2626
timely_sort="0.1.6"

bytes/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[package]
2-
name = "bytes"
2+
name = "timely_bytes"
33
version = "0.1.0"
44
authors = ["Frank McSherry <fmcsherry@me.com>"]
55

communication/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@ default=["arg_parse"]
1717
arg_parse=["getopts"]
1818

1919
[dependencies]
20-
#byteorder="1"
2120
getopts={version="0.2.14", optional=true}
22-
time="0.1.34"
2321
abomonation = { git = "https://github.com/frankmcsherry/abomonation" }
2422
abomonation_derive = "0.3"
25-
bytes = { path = "../bytes" }
23+
timely_bytes = { path = "../bytes" }
24+
timely_logging = { path = "../logging" }
2625

2726
[profile.release]
2827
opt-level = 3

communication/examples/hello.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
11
extern crate timely_communication;
22

33
use std::ops::Deref;
4-
use timely_communication::Message;
4+
use timely_communication::{Message, Allocate};
55

66
fn main() {
77

88
// extract the configuration from user-supplied arguments, initialize the computation.
99
let config = timely_communication::Configuration::from_args(std::env::args()).unwrap();
10-
let logger = ::std::sync::Arc::new(|_| timely_communication::logging::BufferingLogger::new_inactive());
11-
let guards = timely_communication::initialize(config, logger, |mut allocator| {
10+
let guards = timely_communication::initialize(config, |mut allocator| {
1211

1312
println!("worker {} of {} started", allocator.index(), allocator.peers());
1413

1514
// allocates pair of senders list and one receiver.
16-
let (mut senders, mut receiver, _) = allocator.allocate();
15+
let (mut senders, mut receiver) = allocator.allocate(0);
1716

1817
// send typed data along each channel
1918
for i in 0 .. allocator.peers() {

communication/src/allocator/binary.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ pub struct Binary {
1010
pub inner: Process, // inner Process (use for process-local channels)
1111
pub index: usize, // index of this worker
1212
pub peers: usize, // number of peer workers
13-
pub allocated: usize, // indicates how many channels have been allocated (locally).
1413

1514
// for loading up state in the networking threads.
1615
pub readers: Vec<Sender<((usize, usize), Sender<Vec<u8>>)>>,
@@ -26,12 +25,12 @@ impl Binary {
2625
impl Allocate for Binary {
2726
fn index(&self) -> usize { self.index }
2827
fn peers(&self) -> usize { self.peers }
29-
fn allocate<T:Data>(&mut self) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>, Option<usize>) {
28+
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>) {
3029
let mut pushers: Vec<Box<Push<Message<T>>>> = Vec::new();
3130

3231
// we'll need process-local channels as well (no self-loop binary connection in this design; perhaps should allow)
3332
let inner_peers = self.inner.peers();
34-
let (inner_sends, inner_recv, _) = self.inner.allocate();
33+
let (inner_sends, inner_recv) = self.inner.allocate(identifier);
3534

3635
// prep a pushable for each endpoint, multiplied by inner_peers
3736
for index in 0..self.readers.len() {
@@ -42,7 +41,7 @@ impl Allocate for Binary {
4241
if index >= self.index / inner_peers { target_index += inner_peers; }
4342

4443
let header = MessageHeader {
45-
channel: self.allocated,
44+
channel: identifier,
4645
source: self.index,
4746
target: target_index,
4847
length: 0,
@@ -75,9 +74,7 @@ impl Allocate for Binary {
7574
});
7675
let pullable = Box::new(Puller::<T>::new(inner_recv, recv, logger)) as Box<Pull<Message<T>>>;
7776

78-
self.allocated += 1;
79-
80-
(pushers, pullable, Some(self.allocated - 1))
77+
(pushers, pullable)
8178
}
8279
}
8380

communication/src/allocator/generic.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ impl Generic {
4242
}
4343
}
4444
/// Constructs several send endpoints and one receive endpoint.
45-
pub fn allocate<T: Data>(&mut self) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>, Option<usize>) {
45+
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>) {
4646
match self {
47-
&mut Generic::Thread(ref mut t) => t.allocate(),
48-
&mut Generic::Process(ref mut p) => p.allocate(),
49-
&mut Generic::ProcessBinary(ref mut pb) => pb.allocate(),
50-
&mut Generic::ZeroCopy(ref mut z) => z.allocate(),
47+
&mut Generic::Thread(ref mut t) => t.allocate(identifier),
48+
&mut Generic::Process(ref mut p) => p.allocate(identifier),
49+
&mut Generic::ProcessBinary(ref mut pb) => pb.allocate(identifier),
50+
&mut Generic::ZeroCopy(ref mut z) => z.allocate(identifier),
5151
}
5252
}
5353
/// Perform work before scheduling operators.
@@ -73,8 +73,8 @@ impl Generic {
7373
impl Allocate for Generic {
7474
fn index(&self) -> usize { self.index() }
7575
fn peers(&self) -> usize { self.peers() }
76-
fn allocate<T: Data>(&mut self) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>, Option<usize>) {
77-
self.allocate()
76+
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>) {
77+
self.allocate(identifier)
7878
}
7979

8080
fn pre_work(&mut self) { self.pre_work(); }

communication/src/allocator/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub trait Allocate {
3636
/// The number of workers.
3737
fn peers(&self) -> usize;
3838
/// Constructs several send endpoints and one receive endpoint.
39-
fn allocate<T: Data>(&mut self) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>, Option<usize>);
39+
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>);
4040
/// Work performed before scheduling dataflows.
4141
fn pre_work(&mut self) { }
4242
/// Work performed after scheduling dataflows.

communication/src/allocator/process.rs

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,30 @@
33
use std::sync::{Arc, Mutex};
44
use std::any::Any;
55
use std::sync::mpsc::{Sender, Receiver, channel};
6+
use std::collections::HashMap;
67

78
use allocator::{Allocate, AllocateBuilder, Message, Thread};
89
use {Push, Pull};
910

1011
/// An allocater for inter-thread, intra-process communication
1112
pub struct Process {
12-
inner: Thread, // inner Thread
13-
index: usize, // number out of peers
14-
peers: usize, // number of peer allocators (for typed channel allocation).
15-
allocated: usize, // indicates how many have been allocated (locally).
16-
channels: Arc<Mutex<Vec<Box<Any+Send>>>>, // Box<Any+Send> -> Box<Vec<Option<(Vec<Sender<T>>, Receiver<T>)>>>
13+
inner: Thread,
14+
index: usize,
15+
peers: usize,
16+
// below: `Box<Any+Send>` is a `Box<Vec<Option<(Vec<Sender<T>>, Receiver<T>)>>>`
17+
channels: Arc<Mutex<HashMap<usize, Box<Any+Send>>>>,
1718
}
1819

1920
impl Process {
2021
/// Access the wrapped inner allocator.
2122
pub fn inner<'a>(&'a mut self) -> &'a mut Thread { &mut self.inner }
2223
/// Allocate a list of connected intra-process allocators.
2324
pub fn new_vector(count: usize) -> Vec<Process> {
24-
let channels = Arc::new(Mutex::new(Vec::new()));
25+
let channels = Arc::new(Mutex::new(HashMap::new()));
2526
(0 .. count).map(|index| Process {
2627
inner: Thread,
2728
index: index,
2829
peers: count,
29-
allocated: 0,
3030
channels: channels.clone(),
3131
}).collect()
3232
}
@@ -35,43 +35,52 @@ impl Process {
3535
impl Allocate for Process {
3636
fn index(&self) -> usize { self.index }
3737
fn peers(&self) -> usize { self.peers }
38-
fn allocate<T: Any+Send+'static>(&mut self) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>, Option<usize>) {
38+
fn allocate<T: Any+Send+'static>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>) {
3939

4040
// ensure exclusive access to shared list of channels
4141
let mut channels = self.channels.lock().ok().expect("mutex error?");
4242

43-
// we may need to alloc a new channel ...
44-
if self.allocated == channels.len() {
45-
let mut pushers = Vec::new();
46-
let mut pullers = Vec::new();
47-
for _ in 0..self.peers {
48-
let (s, r): (Sender<Message<T>>, Receiver<Message<T>>) = channel();
49-
pushers.push(Pusher { target: s });
50-
pullers.push(Puller { source: r, current: None });
51-
}
52-
53-
let mut to_box = Vec::new();
54-
for recv in pullers.into_iter() {
55-
to_box.push(Some((pushers.clone(), recv)));
56-
}
57-
58-
channels.push(Box::new(to_box));
59-
}
43+
let (send, recv, empty) = {
44+
45+
// we may need to alloc a new channel ...
46+
let mut entry = channels.entry(identifier).or_insert_with(|| {
47+
48+
let mut pushers = Vec::new();
49+
let mut pullers = Vec::new();
50+
for _ in 0..self.peers {
51+
let (s, r): (Sender<Message<T>>, Receiver<Message<T>>) = channel();
52+
pushers.push(Pusher { target: s });
53+
pullers.push(Puller { source: r, current: None });
54+
}
55+
56+
let mut to_box = Vec::new();
57+
for recv in pullers.into_iter() {
58+
to_box.push(Some((pushers.clone(), recv)));
59+
}
60+
61+
Box::new(to_box)
62+
});
63+
64+
let vector =
65+
entry
66+
.downcast_mut::<(Vec<Option<(Vec<Pusher<Message<T>>>, Puller<Message<T>>)>>)>()
67+
.expect("failed to correctly cast channel");
68+
69+
let (send, recv) =
70+
vector[self.index]
71+
.take()
72+
.expect("channel already consumed");
73+
74+
let empty = vector.iter().all(|x| x.is_none());
6075

61-
let vector =
62-
channels[self.allocated]
63-
.downcast_mut::<(Vec<Option<(Vec<Pusher<Message<T>>>, Puller<Message<T>>)>>)>()
64-
.expect("failed to correctly cast channel");
76+
(send, recv, empty)
77+
};
6578

66-
let (send, recv) =
67-
vector[self.index]
68-
.take()
69-
.expect("channel already consumed");
79+
if empty { channels.remove(&identifier); }
7080

71-
self.allocated += 1;
7281
let mut temp = Vec::new();
7382
for s in send.into_iter() { temp.push(Box::new(s) as Box<Push<Message<T>>>); }
74-
(temp, Box::new(recv) as Box<Pull<super::Message<T>>>, None)
83+
(temp, Box::new(recv) as Box<Pull<super::Message<T>>>)
7584
}
7685
}
7786

communication/src/allocator/thread.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ pub struct Thread;
1313
impl Allocate for Thread {
1414
fn index(&self) -> usize { 0 }
1515
fn peers(&self) -> usize { 1 }
16-
fn allocate<T: 'static>(&mut self) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>, Option<usize>) {
16+
fn allocate<T: 'static>(&mut self, _identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>) {
1717
let (pusher, puller) = Thread::new();
18-
(vec![Box::new(pusher)], Box::new(puller), None)
18+
(vec![Box::new(pusher)], Box::new(puller))
1919
}
2020
}
2121

communication/src/allocator/zero_copy/allocator.rs

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Zero-copy allocator based on TCP.
22
use std::rc::Rc;
33
use std::cell::RefCell;
4-
use std::collections::VecDeque;
4+
use std::collections::{VecDeque, HashMap};
55
// use std::sync::mpsc::{channel, Sender, Receiver};
66

77
use bytes::arc::Bytes;
@@ -87,12 +87,12 @@ impl<A: Allocate> TcpBuilder<A> {
8787
inner: self.inner,
8888
index: self.index,
8989
peers: self.peers,
90-
allocated: 0,
90+
// allocated: 0,
9191
_signal: self.signal,
9292
staged: Vec::new(),
9393
sends,
9494
recvs: self.recvs,
95-
to_local: Vec::new(),
95+
to_local: HashMap::new(),
9696
}
9797
}
9898
}
@@ -104,32 +104,29 @@ pub struct TcpAllocator<A: Allocate> {
104104

105105
index: usize, // number out of peers
106106
peers: usize, // number of peer allocators (for typed channel allocation).
107-
allocated: usize, // indicates how many channels have been allocated (locally).
107+
// allocated: usize, // indicates how many channels have been allocated (locally).
108108

109109
_signal: Signal,
110110

111111
staged: Vec<Bytes>,
112112

113113
// sending, receiving, and responding to binary buffers.
114-
sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, // sends[x] -> goes to process x.
115-
recvs: Vec<MergeQueue>, // recvs[x] <- from process x?.
116-
to_local: Vec<Rc<RefCell<VecDeque<Bytes>>>>, // to worker-local typed pullers.
114+
sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, // sends[x] -> goes to process x.
115+
recvs: Vec<MergeQueue>, // recvs[x] <- from process x?.
116+
to_local: HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>, // to worker-local typed pullers.
117117
}
118118

119119
impl<A: Allocate> Allocate for TcpAllocator<A> {
120120
fn index(&self) -> usize { self.index }
121121
fn peers(&self) -> usize { self.peers }
122-
fn allocate<T: Data>(&mut self) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>, Option<usize>) {
123-
124-
let channel_id = self.allocated;
125-
self.allocated += 1;
122+
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>) {
126123

127124
// Result list of boxed pushers.
128125
let mut pushes = Vec::<Box<Push<Message<T>>>>::new();
129126

130127
// Inner exchange allocations.
131128
let inner_peers = self.inner.peers();
132-
let (mut inner_sends, inner_recv, _) = self.inner.allocate();
129+
let (mut inner_sends, inner_recv) = self.inner.allocate(identifier);
133130

134131
for target_index in 0 .. self.peers() {
135132

@@ -142,7 +139,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
142139
else {
143140
// message header template.
144141
let header = MessageHeader {
145-
channel: channel_id,
142+
channel: identifier,
146143
source: self.index,
147144
target: target_index,
148145
length: 0,
@@ -155,13 +152,11 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
155152
}
156153
}
157154

158-
while self.to_local.len() <= channel_id {
159-
self.to_local.push(Rc::new(RefCell::new(VecDeque::new())));
160-
}
155+
self.to_local.insert(identifier, Rc::new(RefCell::new(VecDeque::new())));
161156

162-
let puller = Box::new(PullerInner::new(inner_recv, self.to_local[channel_id].clone()));
157+
let puller = Box::new(PullerInner::new(inner_recv, self.to_local[&identifier].clone()));
163158

164-
(pushes, puller, None)
159+
(pushes, puller, )
165160
}
166161

167162
// Perform preparatory work, most likely reading binary buffers from self.recv.
@@ -186,12 +181,11 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
186181

187182
// Ensure that a queue exists.
188183
// We may receive data before allocating, and shouldn't block.
189-
while self.to_local.len() <= header.channel {
190-
self.to_local.push(Rc::new(RefCell::new(VecDeque::new())));
191-
}
192-
193-
// Introduce the binary slice into the operator input queue.
194-
self.to_local[header.channel].borrow_mut().push_back(peel);
184+
self.to_local
185+
.entry(header.channel)
186+
.or_insert_with(|| Rc::new(RefCell::new(VecDeque::new())))
187+
.borrow_mut()
188+
.push_back(peel);
195189
}
196190
else {
197191
println!("failed to read full header!");

0 commit comments

Comments
 (0)