Skip to content

Commit 83a2648

Browse files
committed
Fix api in kafka tests
Signed-off-by: Heinz N. Gies <heinz@licenser.net>
1 parent 50b1ad4 commit 83a2648

File tree

3 files changed

+18
-6
lines changed

3 files changed

+18
-6
lines changed

src/connectors/tests.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,8 @@ impl ConnectorHarness {
332332
feature = "net-integration",
333333
feature = "ws-integration",
334334
feature = "s3-integration",
335-
feature = "gcp-integration"
335+
feature = "gcp-integration",
336+
feature = "kafka-integration",
336337
))]
337338
pub(crate) async fn send_to_sink(&self, event: Event, port: Port<'static>) -> Result<()> {
338339
self.addr.send_sink(SinkMsg::Event { event, port }).await

src/connectors/tests/kafka/consumer.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use rdkafka::{
2929
config::FromClientConfig,
3030
consumer::{BaseConsumer, Consumer},
3131
error::KafkaResult,
32-
message::OwnedHeaders,
32+
message::{Header, OwnedHeaders},
3333
producer::{FutureProducer, FutureRecord},
3434
ClientConfig, Offset,
3535
};
@@ -101,7 +101,10 @@ async fn transactional_retry() -> Result<()> {
101101
.key("foo")
102102
.partition(1)
103103
.timestamp(42)
104-
.headers(OwnedHeaders::new().add("header", "snot"));
104+
.headers(OwnedHeaders::new().insert(Header {
105+
key: "header",
106+
value: Some("snot"),
107+
}));
105108
if producer.send(record, PRODUCE_TIMEOUT).await.is_err() {
106109
return Err("Unable to send record to kafka".into());
107110
}
@@ -328,7 +331,10 @@ async fn custom_no_retry() -> Result<()> {
328331
.key("foo")
329332
.partition(1)
330333
.timestamp(42)
331-
.headers(OwnedHeaders::new().add("header", "snot"));
334+
.headers(OwnedHeaders::new().insert(Header {
335+
key: "header",
336+
value: Some("snot"),
337+
}));
332338
if producer.send(record, PRODUCE_TIMEOUT).await.is_err() {
333339
return Err("Unable to send record to kafka".into());
334340
}
@@ -530,7 +536,10 @@ async fn performance() -> Result<()> {
530536
.key("foo")
531537
.partition(1)
532538
.timestamp(42)
533-
.headers(OwnedHeaders::new().add("header", "snot"));
539+
.headers(OwnedHeaders::new().insert(Header {
540+
key: "header",
541+
value: Some("snot"),
542+
}));
534543
if producer.send(record, PRODUCE_TIMEOUT).await.is_err() {
535544
return Err("Unable to send record to kafka".into());
536545
}

src/connectors/tests/kafka/producer.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,9 @@ async fn connector_kafka_producer() -> Result<()> {
174174
assert_eq!(Some(123), msg.timestamp().to_millis());
175175
let headers = msg.headers().expect("No headers found");
176176
assert_eq!(1, headers.count());
177-
assert_eq!(Some(("foo", "baz".as_bytes())), headers.get(0));
177+
let h = headers.get(0);
178+
assert_eq!("foo", h.key);
179+
assert_eq!("baz".as_bytes(), h.value.expect("no value"));
178180
consumer
179181
.commit_message(&msg, CommitMode::Sync)
180182
.expect("Commit failed");

0 commit comments

Comments
 (0)