Skip to content

Commit ee878f6

Browse files
authored
Merge pull request #19560 from teskje/dataflow-drop-duration
Introspection of dataflow shutdown durations
2 parents b19cd44 + 9f0a03e commit ee878f6

File tree

13 files changed

+203
-44
lines changed

13 files changed

+203
-44
lines changed

doc/user/content/sql/system-catalog/mz_internal.md

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,21 @@ The `mz_dataflow_addresses` view describes how the [dataflow] channels and opera
415415
| `id` | [`bigint`] | The ID of the channel or operator. Corresponds to [`mz_dataflow_channels.id`](#mz_dataflow_channels) or [`mz_dataflow_operators.id`](#mz_dataflow_operators). |
416416
| `address` | [`bigint list`] | A list of scope-local indexes indicating the path from the root to this channel or operator. |
417417

418+
### `mz_dataflow_arrangement_sizes`
419+
420+
The `mz_dataflow_arrangement_sizes` view describes how many records and batches
421+
are contained in operators under each dataflow.
422+
423+
| Field | Type | Meaning |
424+
|---------------|------------|------------------------------------------------------------------------------|
425+
| `id` | [`bigint`] | The ID of the [dataflow]. Corresponds to [`mz_dataflows.id`](#mz_dataflows). |
426+
| `name` | [`bigint`] | The name of the object (e.g., index) maintained by the dataflow. |
427+
| `records` | [`bigint`] | The number of records in all arrangements in the dataflow. |
428+
| `batches` | [`bigint`] | The number of batches in all arrangements in the dataflow. |
429+
| `size` | [`bigint`] | The utilized size in bytes of the arrangements. |
430+
| `capacity` | [`bigint`] | The capacity in bytes of the arrangements. Can be larger than the size. |
431+
| `allocations` | [`bigint`] | The number of separate memory allocations backing the arrangements. |
432+
418433
### `mz_dataflow_channels`
419434

420435
The `mz_dataflow_channels` view describes the communication channels between [dataflow] operators.
@@ -469,20 +484,14 @@ The `mz_dataflow_operator_parents` view describes how [dataflow] operators are n
469484
| `id` | [`bigint`] | The ID of the operator. Corresponds to [`mz_dataflow_operators.id`](#mz_dataflow_operators). |
470485
| `parent_id` | [`bigint`] | The ID of the operator's parent operator. Corresponds to [`mz_dataflow_operators.id`](#mz_dataflow_operators). |
471486

472-
### `mz_dataflow_arrangement_sizes`
487+
### `mz_dataflow_shutdown_durations_histogram`
473488

474-
The `mz_dataflow_arrangement_sizes` view describes how many records and batches
475-
are contained in operators under each dataflow.
489+
The `mz_dataflow_shutdown_durations_histogram` view describes a histogram of the time in nanoseconds required to fully shut down dropped [dataflows][dataflow].
476490

477-
| Field | Type | Meaning |
478-
|---------------|------------|------------------------------------------------------------------------------|
479-
| `id` | [`bigint`] | The ID of the [dataflow]. Corresponds to [`mz_dataflows.id`](#mz_dataflows). |
480-
| `name` | [`bigint`] | The name of the object (e.g., index) maintained by the dataflow. |
481-
| `records` | [`bigint`] | The number of records in all arrangements in the dataflow. |
482-
| `batches` | [`bigint`] | The number of batches in all arrangements in the dataflow. |
483-
| `size` | [`bigint`] | The utilized size in bytes of the arrangements. |
484-
| `capacity` | [`bigint`] | The capacity in bytes of the arrangements. Can be larger than the size. |
485-
| `allocations` | [`bigint`] | The number of separate memory allocations backing the arrangements. |
491+
| Field | Type | Meaning |
492+
| -------------- | ------------ | -------- |
493+
| `duration_ns` | [`bigint`] | The upper bound of the bucket in nanoseconds. |
494+
| `count` | [`bigint`] | The (noncumulative) count of dataflows in this bucket. |
486495

487496
### `mz_message_counts`
488497

src/adapter/src/catalog/builtin.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1297,6 +1297,12 @@ pub const MZ_PEEK_DURATIONS_HISTOGRAM_RAW: BuiltinLog = BuiltinLog {
12971297
variant: LogVariant::Compute(ComputeLog::PeekDuration),
12981298
};
12991299

1300+
pub const MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM_RAW: BuiltinLog = BuiltinLog {
1301+
name: "mz_dataflow_shutdown_durations_histogram_raw",
1302+
schema: MZ_INTERNAL_SCHEMA,
1303+
variant: LogVariant::Compute(ComputeLog::ShutdownDuration),
1304+
};
1305+
13001306
pub const MZ_ARRANGEMENT_HEAP_SIZE_RAW: BuiltinLog = BuiltinLog {
13011307
name: "mz_arrangement_heap_size_raw",
13021308
schema: MZ_INTERNAL_SCHEMA,
@@ -2639,6 +2645,28 @@ FROM mz_internal.mz_peek_durations_histogram_per_worker
26392645
GROUP BY duration_ns",
26402646
};
26412647

2648+
pub const MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM_PER_WORKER: BuiltinView = BuiltinView {
2649+
name: "mz_dataflow_shutdown_durations_histogram_per_worker",
2650+
schema: MZ_INTERNAL_SCHEMA,
2651+
sql: "CREATE VIEW mz_internal.mz_dataflow_shutdown_durations_histogram_per_worker AS SELECT
2652+
worker_id, duration_ns, pg_catalog.count(*) AS count
2653+
FROM
2654+
mz_internal.mz_dataflow_shutdown_durations_histogram_raw
2655+
GROUP BY
2656+
worker_id, duration_ns",
2657+
};
2658+
2659+
pub const MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM: BuiltinView = BuiltinView {
2660+
name: "mz_dataflow_shutdown_durations_histogram",
2661+
schema: MZ_INTERNAL_SCHEMA,
2662+
sql: "CREATE VIEW mz_internal.mz_dataflow_shutdown_durations_histogram AS
2663+
SELECT
2664+
duration_ns,
2665+
pg_catalog.sum(count) AS count
2666+
FROM mz_internal.mz_dataflow_shutdown_durations_histogram_per_worker
2667+
GROUP BY duration_ns",
2668+
};
2669+
26422670
pub const MZ_SCHEDULING_ELAPSED_PER_WORKER: BuiltinView = BuiltinView {
26432671
name: "mz_scheduling_elapsed_per_worker",
26442672
schema: MZ_INTERNAL_SCHEMA,
@@ -3760,6 +3788,7 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
37603788
Builtin::Log(&MZ_MESSAGE_COUNTS_SENT_RAW),
37613789
Builtin::Log(&MZ_ACTIVE_PEEKS_PER_WORKER),
37623790
Builtin::Log(&MZ_PEEK_DURATIONS_HISTOGRAM_RAW),
3791+
Builtin::Log(&MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM_RAW),
37633792
Builtin::Log(&MZ_ARRANGEMENT_HEAP_CAPACITY_RAW),
37643793
Builtin::Log(&MZ_ARRANGEMENT_HEAP_ALLOCATIONS_RAW),
37653794
Builtin::Log(&MZ_ARRANGEMENT_HEAP_SIZE_RAW),
@@ -3848,6 +3877,8 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
38483877
Builtin::View(&MZ_RECORDS_PER_DATAFLOW),
38493878
Builtin::View(&MZ_PEEK_DURATIONS_HISTOGRAM_PER_WORKER),
38503879
Builtin::View(&MZ_PEEK_DURATIONS_HISTOGRAM),
3880+
Builtin::View(&MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM_PER_WORKER),
3881+
Builtin::View(&MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM),
38513882
Builtin::View(&MZ_SCHEDULING_ELAPSED_PER_WORKER),
38523883
Builtin::View(&MZ_SCHEDULING_ELAPSED),
38533884
Builtin::View(&MZ_SCHEDULING_PARKS_HISTOGRAM_PER_WORKER),

src/compute-client/src/logging.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ message ProtoComputeLog {
5555
google.protobuf.Empty arrangement_heap_size = 8;
5656
google.protobuf.Empty arrangement_heap_capacity = 9;
5757
google.protobuf.Empty arrangement_heap_allocations = 10;
58+
google.protobuf.Empty shutdown_duration = 11;
5859
}
5960
}
6061
message ProtoLogVariant {

src/compute-client/src/logging.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ pub enum ComputeLog {
223223
ArrangementHeapSize,
224224
ArrangementHeapCapacity,
225225
ArrangementHeapAllocations,
226+
ShutdownDuration,
226227
}
227228

228229
impl RustType<ProtoComputeLog> for ComputeLog {
@@ -240,6 +241,7 @@ impl RustType<ProtoComputeLog> for ComputeLog {
240241
ComputeLog::ArrangementHeapSize => ArrangementHeapSize(()),
241242
ComputeLog::ArrangementHeapCapacity => ArrangementHeapCapacity(()),
242243
ComputeLog::ArrangementHeapAllocations => ArrangementHeapAllocations(()),
244+
ComputeLog::ShutdownDuration => ShutdownDuration(()),
243245
}),
244246
}
245247
}
@@ -257,6 +259,7 @@ impl RustType<ProtoComputeLog> for ComputeLog {
257259
Some(ArrangementHeapSize(())) => Ok(ComputeLog::ArrangementHeapSize),
258260
Some(ArrangementHeapCapacity(())) => Ok(ComputeLog::ArrangementHeapCapacity),
259261
Some(ArrangementHeapAllocations(())) => Ok(ComputeLog::ArrangementHeapAllocations),
262+
Some(ShutdownDuration(())) => Ok(ComputeLog::ShutdownDuration),
260263
None => Err(TryFromProtoError::missing_field("ProtoComputeLog::kind")),
261264
}
262265
}
@@ -419,6 +422,10 @@ impl LogVariant {
419422
LogVariant::Compute(ComputeLog::PeekDuration) => RelationDesc::empty()
420423
.with_column("worker_id", ScalarType::UInt64.nullable(false))
421424
.with_column("duration_ns", ScalarType::UInt64.nullable(false)),
425+
426+
LogVariant::Compute(ComputeLog::ShutdownDuration) => RelationDesc::empty()
427+
.with_column("worker_id", ScalarType::UInt64.nullable(false))
428+
.with_column("duration_ns", ScalarType::UInt64.nullable(false)),
422429
}
423430
}
424431

@@ -471,6 +478,7 @@ impl LogVariant {
471478
LogVariant::Compute(ComputeLog::FrontierDelay) => vec![],
472479
LogVariant::Compute(ComputeLog::PeekCurrent) => vec![],
473480
LogVariant::Compute(ComputeLog::PeekDuration) => vec![],
481+
LogVariant::Compute(ComputeLog::ShutdownDuration) => vec![],
474482
}
475483
}
476484
}

src/compute/src/logging/compute.rs

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
1212
use std::any::Any;
1313
use std::cell::RefCell;
14-
use std::collections::{BTreeMap, VecDeque};
14+
use std::collections::{BTreeMap, BTreeSet, VecDeque};
1515
use std::rc::Rc;
1616
use std::time::Duration;
1717

@@ -113,6 +113,11 @@ pub enum ComputeEvent {
113113
/// Operator index
114114
operator: usize,
115115
},
116+
/// All operators of a dataflow have shut down.
117+
DataflowShutdown {
118+
/// Timely worker index of the dataflow.
119+
dataflow_index: usize,
120+
},
116121
}
117122

118123
/// A logged peek event.
@@ -176,6 +181,7 @@ pub(super) fn construct<A: Allocate + 'static>(
176181
let (mut frontier_delay_out, frontier_delay) = demux.new_output();
177182
let (mut peek_out, peek) = demux.new_output();
178183
let (mut peek_duration_out, peek_duration) = demux.new_output();
184+
let (mut shutdown_duration_out, shutdown_duration) = demux.new_output();
179185
let (mut arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
180186
let (mut arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
181187
let (mut arrangement_heap_allocations_out, arrangement_heap_allocations) =
@@ -192,6 +198,7 @@ pub(super) fn construct<A: Allocate + 'static>(
192198
let mut frontier_delay = frontier_delay_out.activate();
193199
let mut peek = peek_out.activate();
194200
let mut peek_duration = peek_duration_out.activate();
201+
let mut shutdown_duration = shutdown_duration_out.activate();
195202
let mut arrangement_heap_size = arrangement_heap_size_out.activate();
196203
let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
197204
let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
@@ -207,6 +214,7 @@ pub(super) fn construct<A: Allocate + 'static>(
207214
frontier_delay: frontier_delay.session(&cap),
208215
peek: peek.session(&cap),
209216
peek_duration: peek_duration.session(&cap),
217+
shutdown_duration: shutdown_duration.session(&cap),
210218
arrangement_heap_size: arrangement_heap_size.session(&cap),
211219
arrangement_heap_capacity: arrangement_heap_capacity.session(&cap),
212220
arrangement_heap_allocations: arrangement_heap_allocations.session(&cap),
@@ -280,7 +288,13 @@ pub(super) fn construct<A: Allocate + 'static>(
280288
let peek_duration = peek_duration.as_collection().map(move |bucket| {
281289
Row::pack_slice(&[
282290
Datum::UInt64(u64::cast_from(worker_id)),
283-
Datum::UInt64(bucket.try_into().expect("pow too big")),
291+
Datum::UInt64(bucket.try_into().expect("bucket too big")),
292+
])
293+
});
294+
let shutdown_duration = shutdown_duration.as_collection().map(move |bucket| {
295+
Row::pack_slice(&[
296+
Datum::UInt64(u64::cast_from(worker_id)),
297+
Datum::UInt64(bucket.try_into().expect("bucket too big")),
284298
])
285299
});
286300

@@ -312,6 +326,7 @@ pub(super) fn construct<A: Allocate + 'static>(
312326
(FrontierDelay, frontier_delay),
313327
(PeekCurrent, peek_current),
314328
(PeekDuration, peek_duration),
329+
(ShutdownDuration, shutdown_duration),
315330
(ArrangementHeapSize, arrangement_heap_size),
316331
(ArrangementHeapCapacity, arrangement_heap_capacity),
317332
(ArrangementHeapAllocations, arrangement_heap_allocations),
@@ -361,6 +376,12 @@ struct DemuxState<A: Allocate> {
361376
export_dataflows: BTreeMap<GlobalId, usize>,
362377
/// Maps dataflow exports to their imports and frontier delay tracking state.
363378
export_imports: BTreeMap<GlobalId, BTreeMap<GlobalId, FrontierDelayState>>,
379+
/// Maps live dataflows to counts of their exports.
380+
dataflow_export_counts: BTreeMap<usize, u32>,
381+
/// Maps dropped dataflows to their drop time (in ns).
382+
dataflow_drop_times: BTreeMap<usize, u128>,
383+
/// Contains dataflows that have shut down but not yet been dropped.
384+
shutdown_dataflows: BTreeSet<usize>,
364385
/// Maps pending peeks to their installation time (in ns).
365386
peek_stash: BTreeMap<Uuid, u128>,
366387
/// Arrangement size stash
@@ -373,6 +394,9 @@ impl<A: Allocate> DemuxState<A> {
373394
worker,
374395
export_dataflows: Default::default(),
375396
export_imports: Default::default(),
397+
dataflow_export_counts: Default::default(),
398+
dataflow_drop_times: Default::default(),
399+
shutdown_dataflows: Default::default(),
376400
peek_stash: Default::default(),
377401
arrangement_size: Default::default(),
378402
}
@@ -403,6 +427,7 @@ struct DemuxOutput<'a> {
403427
frontier_delay: OutputSession<'a, FrontierDelayDatum>,
404428
peek: OutputSession<'a, Peek>,
405429
peek_duration: OutputSession<'a, u128>,
430+
shutdown_duration: OutputSession<'a, u128>,
406431
arrangement_heap_size: OutputSession<'a, ArrangementHeapDatum>,
407432
arrangement_heap_capacity: OutputSession<'a, ArrangementHeapDatum>,
408433
arrangement_heap_allocations: OutputSession<'a, ArrangementHeapDatum>,
@@ -514,6 +539,7 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
514539
ArrangementHeapSizeOperatorDrop { operator } => {
515540
self.handle_arrangement_heap_size_operator_dropped(operator)
516541
}
542+
DataflowShutdown { dataflow_index } => self.handle_dataflow_shutdown(dataflow_index),
517543
}
518544
}
519545

@@ -524,13 +550,30 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
524550

525551
self.state.export_dataflows.insert(id, dataflow_id);
526552
self.state.export_imports.insert(id, BTreeMap::new());
553+
*self
554+
.state
555+
.dataflow_export_counts
556+
.entry(dataflow_id)
557+
.or_default() += 1;
527558
}
528559

529560
fn handle_export_dropped(&mut self, id: GlobalId) {
530561
let ts = self.ts();
531562
if let Some(dataflow_id) = self.state.export_dataflows.remove(&id) {
532563
let datum = ExportDatum { id, dataflow_id };
533564
self.output.export.give((datum, ts, -1));
565+
566+
match self.state.dataflow_export_counts.get_mut(&dataflow_id) {
567+
entry @ Some(0) | entry @ None => {
568+
error!(
569+
export = ?id,
570+
dataflow = ?dataflow_id,
571+
"invalid dataflow_export_counts entry at time of export drop: {entry:?}",
572+
);
573+
}
574+
Some(1) => self.handle_dataflow_dropped(dataflow_id),
575+
Some(count) => *count -= 1,
576+
}
534577
} else {
535578
error!(
536579
export = ?id,
@@ -564,6 +607,41 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
564607
}
565608
}
566609

610+
fn handle_dataflow_dropped(&mut self, id: usize) {
611+
self.state.dataflow_export_counts.remove(&id);
612+
613+
if self.state.shutdown_dataflows.remove(&id) {
614+
// Dataflow has already shut down before it was dropped.
615+
self.output.shutdown_duration.give((0, self.ts(), 1));
616+
} else {
617+
// Dataflow has not yet shut down.
618+
let existing = self
619+
.state
620+
.dataflow_drop_times
621+
.insert(id, self.time.as_nanos());
622+
if existing.is_some() {
623+
error!(dataflow = ?id, "dataflow already dropped");
624+
}
625+
}
626+
}
627+
628+
fn handle_dataflow_shutdown(&mut self, id: usize) {
629+
if let Some(start) = self.state.dataflow_drop_times.remove(&id) {
630+
// Dataflow has alredy been dropped.
631+
let elapsed_ns = self.time.as_nanos() - start;
632+
let elapsed_pow = elapsed_ns.next_power_of_two();
633+
self.output
634+
.shutdown_duration
635+
.give((elapsed_pow, self.ts(), 1));
636+
} else {
637+
// Dataflow has not yet been dropped.
638+
let was_new = self.state.shutdown_dataflows.insert(id);
639+
if !was_new {
640+
error!(dataflow = ?id, "dataflow already shutdown");
641+
}
642+
}
643+
}
644+
567645
fn handle_export_dependency(&mut self, export_id: GlobalId, import_id: GlobalId) {
568646
let ts = self.ts();
569647
let datum = DependencyDatum {

src/compute/src/logging/initialize.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {
9494
self.worker,
9595
self.config,
9696
self.t_event_queue.clone(),
97+
Rc::clone(&self.shared_state),
9798
));
9899
traces.extend(super::reachability::construct(
99100
self.worker,
@@ -131,21 +132,18 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {
131132
}
132133

133134
fn register_loggers(&self) {
134-
self.worker
135-
.log_register()
136-
.insert_logger("timely", self.simple_logger(self.t_event_queue.clone()));
137-
self.worker
138-
.log_register()
139-
.insert_logger("timely/reachability", self.reachability_logger());
140-
self.worker.log_register().insert_logger(
141-
"differential/arrange",
142-
self.simple_logger(self.d_event_queue.clone()),
143-
);
144-
let compute_logger = self.simple_logger(self.c_event_queue.clone());
145-
self.worker
146-
.log_register()
147-
.insert_logger("materialize/compute", compute_logger.clone());
148-
self.shared_state.borrow_mut().compute_logger = Some(compute_logger);
135+
let t_logger = self.simple_logger(self.t_event_queue.clone());
136+
let r_logger = self.reachability_logger();
137+
let d_logger = self.simple_logger(self.d_event_queue.clone());
138+
let c_logger = self.simple_logger(self.c_event_queue.clone());
139+
140+
let mut register = self.worker.log_register();
141+
register.insert_logger("timely", t_logger);
142+
register.insert_logger("timely/reachability", r_logger);
143+
register.insert_logger("differential/arrange", d_logger);
144+
register.insert_logger("materialize/compute", c_logger.clone());
145+
146+
self.shared_state.borrow_mut().compute_logger = Some(c_logger);
149147
}
150148

151149
fn simple_logger<E: 'static>(&self, event_queue: EventQueue<E>) -> Logger<E> {

0 commit comments

Comments
 (0)