Skip to content

Fix up to a 1s delay in first produce to new topic #5032

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
# librdkafka v2.10.1

librdkafka v2.10.1 is a maintenance release:

* Fix an issue where the first message to any topic produced via `producev` or
`produceva` was not delivered late (by up to 1 second) (#5032).

## Fixes

### Producer fixes

* In case of `producev` or `produceva`, the producer did not enqueue a leader
query metadata request immediately, and rather, waited for the 1 second
timer to kick in. This could cause delays in the sending of the first message
by up to 1 second. (#5032).


# librdkafka v2.10.0

librdkafka v2.10.0 is a feature release:
Expand Down
12 changes: 9 additions & 3 deletions src/rdkafka_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ rd_kafka_produceva(rd_kafka_t *rk, const rd_kafka_vu_t *vus, size_t cnt) {
rd_kafka_error_t *error = NULL;
rd_kafka_headers_t *hdrs = NULL;
rd_kafka_headers_t *app_hdrs = NULL; /* App-provided headers list */
int existing = 0;
size_t i;

if (unlikely(rd_kafka_check_produce(rk, &error)))
Expand All @@ -392,8 +393,10 @@ rd_kafka_produceva(rd_kafka_t *rk, const rd_kafka_vu_t *vus, size_t cnt) {
const rd_kafka_vu_t *vu = &vus[i];
switch (vu->vtype) {
case RD_KAFKA_VTYPE_TOPIC:
rkt =
rd_kafka_topic_new0(rk, vu->u.cstr, NULL, NULL, 1);
rkt = rd_kafka_topic_new0(rk, vu->u.cstr, NULL,
&existing, 1);
if (!existing)
rd_kafka_topic_leader_query(rk, rkt);
break;

case RD_KAFKA_VTYPE_RKT:
Expand Down Expand Up @@ -549,6 +552,7 @@ rd_kafka_resp_err_t rd_kafka_producev(rd_kafka_t *rk, ...) {
rd_kafka_resp_err_t err;
rd_kafka_headers_t *hdrs = NULL;
rd_kafka_headers_t *app_hdrs = NULL; /* App-provided headers list */
int existing = 0;

if (unlikely((err = rd_kafka_check_produce(rk, NULL))))
return err;
Expand All @@ -559,7 +563,9 @@ rd_kafka_resp_err_t rd_kafka_producev(rd_kafka_t *rk, ...) {
switch (vtype) {
case RD_KAFKA_VTYPE_TOPIC:
rkt = rd_kafka_topic_new0(rk, va_arg(ap, const char *),
NULL, NULL, 1);
NULL, &existing, 1);
if (!existing)
rd_kafka_topic_leader_query(rk, rkt);
break;

case RD_KAFKA_VTYPE_RKT:
Expand Down
144 changes: 144 additions & 0 deletions tests/0055-producer_latency.c
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,147 @@ int main_0055_producer_latency(int argc, char **argv) {

return 0;
}

static void dr_msg_cb_first_message(rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage,
void *opaque) {
test_timing_t *t_produce = (test_timing_t *)rkmessage->_private;
TIMING_STOP(t_produce);
/* The reason for setting such a low value is that both the mcluster and
* the producer are running locally, and there is no linger. This
* prevents the test from passing spuriously. */
TIMING_ASSERT_LATER(t_produce, 0, 100);

TEST_ASSERT(rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR,
"expected no error, got %s",
rd_kafka_err2str(rkmessage->err));
TEST_ASSERT(rkmessage->offset == 0);
}

/**
* Test producer latency with a first message to a topic, that is delayed by a
* given amount of time after creating the producer.
*
* Cases:
* 0: rd_kafka_produce
* 1: rd_kafka_producev
* 2: rd_kafka_produceva
* 3: rd_kafka_produce_batch
*/
static void test_producer_latency_first_message(int wait_time_ms,
int case_number) {
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
const char *topic = test_mk_topic_name("0055_producer_latency_mock", 1);
test_timing_t t_produce;
rd_kafka_resp_err_t err;
rd_kafka_mock_cluster_t *mcluster;
const char *bootstrap_servers;
const char *case_name = NULL;

if (case_number == 0) {
case_name = "rd_kafka_produce";
} else if (case_number == 1) {
case_name = "rd_kafka_producev";
} else if (case_number == 2) {
case_name = "rd_kafka_produceva";
} else if (case_number == 3) {
case_name = "rd_kafka_produce_batch";
}

SUB_TEST("Starting test for %s with wait time %d ms", case_name,
wait_time_ms);

mcluster = test_mock_cluster_new(1, &bootstrap_servers);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);

test_conf_init(&conf, NULL, 60);
test_conf_set(conf, "bootstrap.servers", bootstrap_servers);
test_conf_set(conf, "linger.ms", "0");
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb_first_message);
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

switch (case_number) {
case 0: {
char *payload = "value";
rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, NULL);
int res;
rd_usleep(wait_time_ms * 1000ll, 0);
TIMING_START(&t_produce, "Produce message");

res = rd_kafka_produce(rkt, 0, RD_KAFKA_MSG_F_COPY, payload, 5,
NULL, 0, &t_produce);
TEST_ASSERT(res == 0, "expected no error");
break;
}
case 1: {
rd_usleep(wait_time_ms * 1000ll, 0);
TIMING_START(&t_produce, "Produce message");
rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("value", 5),
RD_KAFKA_V_OPAQUE((&t_produce)), RD_KAFKA_V_END);
break;
}
case 2: {
rd_kafka_vu_t vus[3];
vus[0].vtype = RD_KAFKA_VTYPE_TOPIC;
vus[0].u.cstr = topic;
vus[1].vtype = RD_KAFKA_VTYPE_VALUE;
vus[1].u.mem.ptr = "value";
vus[1].u.mem.size = 5;
vus[2].vtype = RD_KAFKA_VTYPE_OPAQUE;
vus[2].u.ptr = &t_produce;

rd_usleep(wait_time_ms * 1000ll, 0);
TIMING_START(&t_produce, "Produce message");
rd_kafka_produceva(rk, vus, 3);
break;
}
case 3: {
rd_kafka_message_t rkmessages[1];
rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, NULL);
int res;

rkmessages[0].payload = "value";
rkmessages[0].len = 5;
rkmessages[0]._private = &t_produce;

rd_usleep(wait_time_ms * 1000ll, 0);
TIMING_START(&t_produce, "Produce message");
res = rd_kafka_produce_batch(rkt, 0, RD_KAFKA_MSG_F_COPY,
rkmessages, 1);
TEST_ASSERT(res == 1, "expected 1 msg enqueued, got %d", res);
break;
}
}
rd_kafka_poll(rk, 0);
err = rd_kafka_flush(rk, 10000);
TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR,
"expected all messages to be flushed, got %s",
rd_kafka_err2str(err));

rd_kafka_destroy(rk);
test_mock_cluster_destroy(mcluster);

SUB_TEST_PASS();
}

int main_0055_producer_latency_mock(int argc, char **argv) {
/* The topic_scan_all timer runs every 1 second. Originally, the issue
* was that the topic metadata was only fetched on the topic scan rather
* than when we were queueing the message. Having a variety of times
* makes sure that we don't get flaky passing tests due to coincidence.
*/
int i, j;
int wait_time_mss[] = {500, 1200, 2500, 3700, 4900};

for (i = 0; i < 5; i++) {
for (j = 0; j < 3; j++) {
test_producer_latency_first_message(wait_time_mss[i],
j);
}
}

return 0;
}
2 changes: 2 additions & 0 deletions tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ _TEST_DECL(0053_stats_timing);
_TEST_DECL(0053_stats);
_TEST_DECL(0054_offset_time);
_TEST_DECL(0055_producer_latency);
_TEST_DECL(0055_producer_latency_mock);
_TEST_DECL(0056_balanced_group_mt);
_TEST_DECL(0057_invalid_topic);
_TEST_DECL(0058_log);
Expand Down Expand Up @@ -399,6 +400,7 @@ struct test tests[] = {
_TEST(0053_stats, 0),
_TEST(0054_offset_time, 0, TEST_BRKVER(0, 10, 1, 0)),
_TEST(0055_producer_latency, TEST_F_KNOWN_ISSUE_WIN32),
_TEST(0055_producer_latency_mock, 0, TEST_F_LOCAL),
_TEST(0056_balanced_group_mt, 0, TEST_BRKVER(0, 9, 0, 0)),
_TEST(0057_invalid_topic, 0, TEST_BRKVER(0, 9, 0, 0)),
_TEST(0058_log, TEST_F_LOCAL),
Expand Down