-
Notifications
You must be signed in to change notification settings - Fork 473
Capture RDKafka Statistics as an internal built-in log #6131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
93fb82a
dc50905
ee80890
23c3970
0388f52
8945be4
f73c6f6
045293d
9f34b96
f09fe85
dbec829
3713dbe
1c38647
77d3c5e
03a0928
d09093e
e871578
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,8 @@ | |
|
||
use std::time::Duration; | ||
|
||
use differential_dataflow::{difference::DiffPair, operators::count::CountTotal}; | ||
use differential_dataflow::difference::{DiffPair, DiffVector}; | ||
use differential_dataflow::operators::count::CountTotal; | ||
use log::error; | ||
use timely::communication::Allocate; | ||
use timely::dataflow::operators::capture::EventLink; | ||
|
@@ -38,6 +39,34 @@ pub enum MaterializedEvent { | |
/// Globally unique identifier for the source on which the dataflow depends. | ||
source: GlobalId, | ||
}, | ||
/// Tracks statistics for a particular Kafka consumer / partition pair | ||
/// Reference: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md | ||
KafkaConsumerInfo { | ||
/// Kafka name for the consumer | ||
consumer_name: String, | ||
/// Materialize source identifier | ||
source_id: SourceInstanceId, | ||
/// The Kafka partition ID for these metrics (may be multiple per consumer) | ||
partition_id: String, | ||
/// Number of message sets received from Brokers | ||
rxmsgs: i64, | ||
/// Number of bytes received from Brokers | ||
rxbytes: i64, | ||
/// Number of message sets sent to Brokers | ||
txmsgs: i64, | ||
/// Number of bytes transmitted to Brokers | ||
txbytes: i64, | ||
/// Partition's low watermark offset on the broker | ||
lo_offset: i64, | ||
/// Partition's high watermark offset on the broker | ||
hi_offset: i64, | ||
/// Last stable offset on the broker | ||
ls_offset: i64, | ||
/// How far into the topic our consumer has read | ||
app_offset: i64, | ||
/// How many messages remain until our consumer reaches the (hi|lo) watermark | ||
consumer_lag: i64, | ||
}, | ||
/// Peek command, true for install and false for retire. | ||
Peek(Peek, bool), | ||
/// Tracks the source name, id, partition id, and received/ingested offsets | ||
|
@@ -103,9 +132,10 @@ pub fn construct<A: Allocate>( | |
let mut input = demux.new_input(&logs, Pipeline); | ||
let (mut dataflow_out, dataflow) = demux.new_output(); | ||
let (mut dependency_out, dependency) = demux.new_output(); | ||
let (mut frontier_out, frontier) = demux.new_output(); | ||
let (mut kafka_consumer_info_out, kafka_consumer_info) = demux.new_output(); | ||
let (mut peek_out, peek) = demux.new_output(); | ||
let (mut source_info_out, source_info) = demux.new_output(); | ||
let (mut frontier_out, frontier) = demux.new_output(); | ||
|
||
let mut demux_buffer = Vec::new(); | ||
demux.build(move |_capability| { | ||
|
@@ -114,18 +144,20 @@ pub fn construct<A: Allocate>( | |
move |_frontiers| { | ||
let mut dataflow = dataflow_out.activate(); | ||
let mut dependency = dependency_out.activate(); | ||
let mut frontier = frontier_out.activate(); | ||
let mut kafka_consumer_info = kafka_consumer_info_out.activate(); | ||
let mut peek = peek_out.activate(); | ||
let mut source_info = source_info_out.activate(); | ||
let mut frontier = frontier_out.activate(); | ||
|
||
input.for_each(|time, data| { | ||
data.swap(&mut demux_buffer); | ||
|
||
let mut dataflow_session = dataflow.session(&time); | ||
let mut dependency_session = dependency.session(&time); | ||
let mut frontier_session = frontier.session(&time); | ||
let mut kafka_consumer_info_session = kafka_consumer_info.session(&time); | ||
let mut peek_session = peek.session(&time); | ||
let mut source_info_session = source_info.session(&time); | ||
let mut frontier_session = frontier.session(&time); | ||
|
||
for (time, worker, datum) in demux_buffer.drain(..) { | ||
let time_ns = time.as_nanos() as Timestamp; | ||
|
@@ -176,6 +208,47 @@ pub fn construct<A: Allocate>( | |
), | ||
} | ||
} | ||
MaterializedEvent::Frontier(name, logical, delta) => { | ||
frontier_session.give(( | ||
row_packer.pack(&[ | ||
Datum::String(&name.to_string()), | ||
Datum::Int64(worker as i64), | ||
Datum::Int64(logical as i64), | ||
]), | ||
time_ms, | ||
delta as isize, | ||
)); | ||
} | ||
MaterializedEvent::KafkaConsumerInfo { | ||
consumer_name, | ||
source_id, | ||
partition_id, | ||
rxmsgs, | ||
rxbytes, | ||
txmsgs, | ||
txbytes, | ||
lo_offset, | ||
hi_offset, | ||
ls_offset, | ||
app_offset, | ||
consumer_lag, | ||
} => { | ||
kafka_consumer_info_session.give(( | ||
(consumer_name, source_id, partition_id), | ||
time_ms, | ||
DiffVector::new(vec![ | ||
rxmsgs, | ||
rxbytes, | ||
txmsgs, | ||
txbytes, | ||
lo_offset, | ||
hi_offset, | ||
ls_offset, | ||
app_offset, | ||
consumer_lag, | ||
]), | ||
)); | ||
} | ||
MaterializedEvent::Peek(peek, is_install) => { | ||
peek_session.give((peek, worker, is_install, time_ns)) | ||
} | ||
|
@@ -192,17 +265,6 @@ pub fn construct<A: Allocate>( | |
DiffPair::new(offset, timestamp), | ||
)); | ||
} | ||
MaterializedEvent::Frontier(name, logical, delta) => { | ||
frontier_session.give(( | ||
row_packer.pack(&[ | ||
Datum::String(&name.to_string()), | ||
Datum::Int64(worker as i64), | ||
Datum::Int64(logical as i64), | ||
]), | ||
time_ms, | ||
delta as isize, | ||
)); | ||
} | ||
} | ||
} | ||
}); | ||
|
@@ -245,6 +307,30 @@ pub fn construct<A: Allocate>( | |
} | ||
}); | ||
|
||
let frontier_current = frontier.as_collection(); | ||
|
||
use differential_dataflow::operators::Count; | ||
let kafka_consumer_info_current = kafka_consumer_info.as_collection().count().map({ | ||
let mut row_packer = repr::RowPacker::new(); | ||
move |((consumer_name, source_id, partition_id), diff_vector)| { | ||
row_packer.pack(&[ | ||
Datum::String(&consumer_name), | ||
Datum::String(&source_id.source_id.to_string()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if this is correct now, I think that you need both source_id and dataflow_id to uniquely identify these metrics? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, you're right! I just pushed a fix for this. |
||
Datum::Int64(source_id.dataflow_id as i64), | ||
Datum::String(&partition_id), | ||
Datum::Int64(diff_vector[0]), | ||
Datum::Int64(diff_vector[1]), | ||
Datum::Int64(diff_vector[2]), | ||
Datum::Int64(diff_vector[3]), | ||
Datum::Int64(diff_vector[4]), | ||
Datum::Int64(diff_vector[5]), | ||
Datum::Int64(diff_vector[6]), | ||
Datum::Int64(diff_vector[7]), | ||
Datum::Int64(diff_vector[8]), | ||
]) | ||
} | ||
}); | ||
|
||
let peek_current = peek | ||
.map(move |(name, worker, is_install, time_ns)| { | ||
let time_ms = (time_ns / 1_000_000) as Timestamp; | ||
|
@@ -265,7 +351,6 @@ pub fn construct<A: Allocate>( | |
} | ||
}); | ||
|
||
use differential_dataflow::operators::Count; | ||
let source_info_current = source_info.as_collection().count().map({ | ||
let mut row_packer = repr::RowPacker::new(); | ||
move |((name, id, pid), pair)| { | ||
|
@@ -282,8 +367,6 @@ pub fn construct<A: Allocate>( | |
} | ||
}); | ||
|
||
let frontier_current = frontier.as_collection(); | ||
|
||
// Duration statistics derive from the non-rounded event times. | ||
let peek_duration = peek | ||
.unary( | ||
|
@@ -361,6 +444,10 @@ pub fn construct<A: Allocate>( | |
LogVariant::Materialized(MaterializedLog::FrontierCurrent), | ||
frontier_current, | ||
), | ||
( | ||
LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo), | ||
kafka_consumer_info_current, | ||
), | ||
( | ||
LogVariant::Materialized(MaterializedLog::PeekCurrent), | ||
peek_current, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this will generate a string that has a global_id and instance_id inline as e.g.
u0/1
where the global id tracks the source, and the instance ID tracks the per-worker/per-instantiation key.I think it'd be better to split these out into two columns, because then the
source_id
here would be directly comparable to theGlobalID
s in otherMaterializedEvent
variants.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that but didn't see any easy way to split this out. Let me look again!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both the fields are pub, I think just renaming this to
source: GlobalId, source_instance: u64
should do it.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! I missed that SourceInfo does
&id.source_id.to_string()
. Updated kafka_consumer_info to do the same.I also added a test case to verify the join behavior.