Skip to content

Commit ca39730

Browse files
authored
Capture RDKafka Consumer Statistics as an internal built-in log (#6131)
Kafka sources can be defined with a statistics_interval_ms option that defines how often librdkafka / rdkafka should call into Materialize code with kafka client statistics. Previously, we simply printed these metrics to the logfile. This commit stashes a select number of metrics in a system table to give users the ability to query these metrics directly. Fixes #5666
1 parent d8d974a commit ca39730

File tree

11 files changed

+379
-33
lines changed

11 files changed

+379
-33
lines changed

src/coord/src/catalog/builtin.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,14 @@ pub const MZ_MESSAGE_COUNTS: BuiltinLog = BuiltinLog {
554554
index_id: GlobalId::System(3029),
555555
};
556556

557+
pub const MZ_KAFKA_CONSUMER_STATISTICS: BuiltinLog = BuiltinLog {
558+
name: "mz_kafka_consumer_statistics",
559+
schema: MZ_CATALOG_SCHEMA,
560+
variant: LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo),
561+
id: GlobalId::System(3030),
562+
index_id: GlobalId::System(3031),
563+
};
564+
557565
lazy_static! {
558566
pub static ref MZ_VIEW_KEYS: BuiltinTable = BuiltinTable {
559567
name: "mz_view_keys",
@@ -1271,6 +1279,7 @@ lazy_static! {
12711279
Builtin::Log(&MZ_PEEK_DURATIONS),
12721280
Builtin::Log(&MZ_SOURCE_INFO),
12731281
Builtin::Log(&MZ_MESSAGE_COUNTS),
1282+
Builtin::Log(&MZ_KAFKA_CONSUMER_STATISTICS),
12741283
Builtin::Table(&MZ_VIEW_KEYS),
12751284
Builtin::Table(&MZ_VIEW_FOREIGN_KEYS),
12761285
Builtin::Table(&MZ_KAFKA_SINKS),

src/dataflow-types/src/logging.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub enum MaterializedLog {
5252
DataflowCurrent,
5353
DataflowDependency,
5454
FrontierCurrent,
55+
KafkaConsumerInfo,
5556
PeekCurrent,
5657
PeekDuration,
5758
SourceInfo,
@@ -162,6 +163,22 @@ impl LogVariant {
162163
.with_column("worker", ScalarType::Int64.nullable(false))
163164
.with_column("time", ScalarType::Int64.nullable(false)),
164165

166+
LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => RelationDesc::empty()
167+
.with_column("consumer_name", ScalarType::String.nullable(false))
168+
.with_column("source_id", ScalarType::String.nullable(false))
169+
.with_column("dataflow_id", ScalarType::Int64.nullable(false))
170+
.with_column("partition_id", ScalarType::String.nullable(false))
171+
.with_column("rx_msgs", ScalarType::Int64.nullable(false))
172+
.with_column("rx_bytes", ScalarType::Int64.nullable(false))
173+
.with_column("tx_msgs", ScalarType::Int64.nullable(false))
174+
.with_column("tx_bytes", ScalarType::Int64.nullable(false))
175+
.with_column("lo_offset", ScalarType::Int64.nullable(false))
176+
.with_column("hi_offset", ScalarType::Int64.nullable(false))
177+
.with_column("ls_offset", ScalarType::Int64.nullable(false))
178+
.with_column("app_offset", ScalarType::Int64.nullable(false))
179+
.with_column("consumer_lag", ScalarType::Int64.nullable(false))
180+
.with_key(vec![0, 1, 2]),
181+
165182
LogVariant::Materialized(MaterializedLog::PeekCurrent) => RelationDesc::empty()
166183
.with_column("uuid", ScalarType::String.nullable(false))
167184
.with_column("worker", ScalarType::Int64.nullable(false))
@@ -219,6 +236,10 @@ impl LogVariant {
219236
LogVariant::Materialized(MaterializedLog::DataflowCurrent) => vec![],
220237
LogVariant::Materialized(MaterializedLog::DataflowDependency) => vec![],
221238
LogVariant::Materialized(MaterializedLog::FrontierCurrent) => vec![],
239+
LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => vec![(
240+
LogVariant::Materialized(MaterializedLog::SourceInfo),
241+
vec![(1, 1), (2, 2), (3, 3)],
242+
)],
222243
LogVariant::Materialized(MaterializedLog::PeekCurrent) => vec![],
223244
LogVariant::Materialized(MaterializedLog::SourceInfo) => vec![],
224245
LogVariant::Materialized(MaterializedLog::PeekDuration) => vec![],

src/dataflow/src/logging/materialized.rs

Lines changed: 105 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
1212
use std::time::Duration;
1313

14-
use differential_dataflow::{difference::DiffPair, operators::count::CountTotal};
14+
use differential_dataflow::difference::{DiffPair, DiffVector};
15+
use differential_dataflow::operators::count::CountTotal;
1516
use log::error;
1617
use timely::communication::Allocate;
1718
use timely::dataflow::operators::capture::EventLink;
@@ -38,6 +39,34 @@ pub enum MaterializedEvent {
3839
/// Globally unique identifier for the source on which the dataflow depends.
3940
source: GlobalId,
4041
},
42+
/// Tracks statistics for a particular Kafka consumer / partition pair
43+
/// Reference: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
44+
KafkaConsumerInfo {
45+
/// Kafka name for the consumer
46+
consumer_name: String,
47+
/// Materialize source identifier
48+
source_id: SourceInstanceId,
49+
/// The Kafka partition ID for these metrics (may be multiple per consumer)
50+
partition_id: String,
51+
/// Number of message sets received from Brokers
52+
rxmsgs: i64,
53+
/// Number of bytes received from Brokers
54+
rxbytes: i64,
55+
/// Number of message sets sent to Brokers
56+
txmsgs: i64,
57+
/// Number of bytes transmitted to Brokers
58+
txbytes: i64,
59+
/// Partition's low watermark offset on the broker
60+
lo_offset: i64,
61+
/// Partition's high watermark offset on the broker
62+
hi_offset: i64,
63+
/// Last stable offset on the broker
64+
ls_offset: i64,
65+
/// How far into the topic our consumer has read
66+
app_offset: i64,
67+
/// How many messages remain until our consumer reaches the (hi|lo) watermark
68+
consumer_lag: i64,
69+
},
4170
/// Peek command, true for install and false for retire.
4271
Peek(Peek, bool),
4372
/// Tracks the source name, id, partition id, and received/ingested offsets
@@ -103,9 +132,10 @@ pub fn construct<A: Allocate>(
103132
let mut input = demux.new_input(&logs, Pipeline);
104133
let (mut dataflow_out, dataflow) = demux.new_output();
105134
let (mut dependency_out, dependency) = demux.new_output();
135+
let (mut frontier_out, frontier) = demux.new_output();
136+
let (mut kafka_consumer_info_out, kafka_consumer_info) = demux.new_output();
106137
let (mut peek_out, peek) = demux.new_output();
107138
let (mut source_info_out, source_info) = demux.new_output();
108-
let (mut frontier_out, frontier) = demux.new_output();
109139

110140
let mut demux_buffer = Vec::new();
111141
demux.build(move |_capability| {
@@ -114,18 +144,20 @@ pub fn construct<A: Allocate>(
114144
move |_frontiers| {
115145
let mut dataflow = dataflow_out.activate();
116146
let mut dependency = dependency_out.activate();
147+
let mut frontier = frontier_out.activate();
148+
let mut kafka_consumer_info = kafka_consumer_info_out.activate();
117149
let mut peek = peek_out.activate();
118150
let mut source_info = source_info_out.activate();
119-
let mut frontier = frontier_out.activate();
120151

121152
input.for_each(|time, data| {
122153
data.swap(&mut demux_buffer);
123154

124155
let mut dataflow_session = dataflow.session(&time);
125156
let mut dependency_session = dependency.session(&time);
157+
let mut frontier_session = frontier.session(&time);
158+
let mut kafka_consumer_info_session = kafka_consumer_info.session(&time);
126159
let mut peek_session = peek.session(&time);
127160
let mut source_info_session = source_info.session(&time);
128-
let mut frontier_session = frontier.session(&time);
129161

130162
for (time, worker, datum) in demux_buffer.drain(..) {
131163
let time_ns = time.as_nanos() as Timestamp;
@@ -176,6 +208,47 @@ pub fn construct<A: Allocate>(
176208
),
177209
}
178210
}
211+
MaterializedEvent::Frontier(name, logical, delta) => {
212+
frontier_session.give((
213+
row_packer.pack(&[
214+
Datum::String(&name.to_string()),
215+
Datum::Int64(worker as i64),
216+
Datum::Int64(logical as i64),
217+
]),
218+
time_ms,
219+
delta as isize,
220+
));
221+
}
222+
MaterializedEvent::KafkaConsumerInfo {
223+
consumer_name,
224+
source_id,
225+
partition_id,
226+
rxmsgs,
227+
rxbytes,
228+
txmsgs,
229+
txbytes,
230+
lo_offset,
231+
hi_offset,
232+
ls_offset,
233+
app_offset,
234+
consumer_lag,
235+
} => {
236+
kafka_consumer_info_session.give((
237+
(consumer_name, source_id, partition_id),
238+
time_ms,
239+
DiffVector::new(vec![
240+
rxmsgs,
241+
rxbytes,
242+
txmsgs,
243+
txbytes,
244+
lo_offset,
245+
hi_offset,
246+
ls_offset,
247+
app_offset,
248+
consumer_lag,
249+
]),
250+
));
251+
}
179252
MaterializedEvent::Peek(peek, is_install) => {
180253
peek_session.give((peek, worker, is_install, time_ns))
181254
}
@@ -192,17 +265,6 @@ pub fn construct<A: Allocate>(
192265
DiffPair::new(offset, timestamp),
193266
));
194267
}
195-
MaterializedEvent::Frontier(name, logical, delta) => {
196-
frontier_session.give((
197-
row_packer.pack(&[
198-
Datum::String(&name.to_string()),
199-
Datum::Int64(worker as i64),
200-
Datum::Int64(logical as i64),
201-
]),
202-
time_ms,
203-
delta as isize,
204-
));
205-
}
206268
}
207269
}
208270
});
@@ -245,6 +307,30 @@ pub fn construct<A: Allocate>(
245307
}
246308
});
247309

310+
let frontier_current = frontier.as_collection();
311+
312+
use differential_dataflow::operators::Count;
313+
let kafka_consumer_info_current = kafka_consumer_info.as_collection().count().map({
314+
let mut row_packer = repr::RowPacker::new();
315+
move |((consumer_name, source_id, partition_id), diff_vector)| {
316+
row_packer.pack(&[
317+
Datum::String(&consumer_name),
318+
Datum::String(&source_id.source_id.to_string()),
319+
Datum::Int64(source_id.dataflow_id as i64),
320+
Datum::String(&partition_id),
321+
Datum::Int64(diff_vector[0]),
322+
Datum::Int64(diff_vector[1]),
323+
Datum::Int64(diff_vector[2]),
324+
Datum::Int64(diff_vector[3]),
325+
Datum::Int64(diff_vector[4]),
326+
Datum::Int64(diff_vector[5]),
327+
Datum::Int64(diff_vector[6]),
328+
Datum::Int64(diff_vector[7]),
329+
Datum::Int64(diff_vector[8]),
330+
])
331+
}
332+
});
333+
248334
let peek_current = peek
249335
.map(move |(name, worker, is_install, time_ns)| {
250336
let time_ms = (time_ns / 1_000_000) as Timestamp;
@@ -265,7 +351,6 @@ pub fn construct<A: Allocate>(
265351
}
266352
});
267353

268-
use differential_dataflow::operators::Count;
269354
let source_info_current = source_info.as_collection().count().map({
270355
let mut row_packer = repr::RowPacker::new();
271356
move |((name, id, pid), pair)| {
@@ -282,8 +367,6 @@ pub fn construct<A: Allocate>(
282367
}
283368
});
284369

285-
let frontier_current = frontier.as_collection();
286-
287370
// Duration statistics derive from the non-rounded event times.
288371
let peek_duration = peek
289372
.unary(
@@ -361,6 +444,10 @@ pub fn construct<A: Allocate>(
361444
LogVariant::Materialized(MaterializedLog::FrontierCurrent),
362445
frontier_current,
363446
),
447+
(
448+
LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo),
449+
kafka_consumer_info_current,
450+
),
364451
(
365452
LogVariant::Materialized(MaterializedLog::PeekCurrent),
366453
peek_current,

src/dataflow/src/source/file.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use expr::{PartitionId, SourceInstanceId};
2626
use mz_avro::types::Value;
2727
use mz_avro::{AvroRead, Schema, Skip};
2828

29+
use crate::logging::materialized::Logger;
2930
use crate::source::{NextMessage, SourceMessage, SourceReader};
3031

3132
/// Contains all information necessary to ingest data from file sources (either
@@ -63,6 +64,7 @@ impl SourceReader<Value> for FileSourceReader<Value> {
6364
consumer_activator: SyncActivator,
6465
connector: ExternalSourceConnector,
6566
encoding: DataEncoding,
67+
_: Option<Logger>,
6668
) -> Result<(FileSourceReader<Value>, Option<PartitionId>), anyhow::Error> {
6769
let receiver = match connector {
6870
ExternalSourceConnector::AvroOcf(oc) => {
@@ -139,6 +141,7 @@ impl SourceReader<Vec<u8>> for FileSourceReader<Vec<u8>> {
139141
consumer_activator: SyncActivator,
140142
connector: ExternalSourceConnector,
141143
_: DataEncoding,
144+
_: Option<Logger>,
142145
) -> Result<(FileSourceReader<Vec<u8>>, Option<PartitionId>), anyhow::Error> {
143146
let receiver = match connector {
144147
ExternalSourceConnector::File(fc) => {

0 commit comments

Comments
 (0)