Skip to content

[KIP-848] Static membership test updates with regex changes #5031

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 220 additions & 10 deletions tests/0102-static_group_rebalance.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,14 @@ static void rebalance_cb(rd_kafka_t *rk,

c->partition_cnt = parts->cnt;
c->assigned_at = test_clock();
rd_kafka_assign(rk, parts);

test_consumer_assign_by_rebalance_protocol("rebalance", rk,
parts);
break;

case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
c->revoked_at = test_clock();
rd_kafka_assign(rk, NULL);
test_consumer_unassign_by_rebalance_protocol("rebalance", rk,
parts);
TEST_SAY("line %d: %s revoked %d partitions\n", c->curr_line,
rd_kafka_name(c->rk), parts->cnt);

Expand All @@ -143,7 +144,7 @@ static void rebalance_cb(rd_kafka_t *rk,
}


static void do_test_static_group_rebalance(void) {
static void do_test_static_group_rebalance_classic(void) {
rd_kafka_conf_t *conf;
test_msgver_t mv;
int64_t rebalance_start;
Expand Down Expand Up @@ -413,6 +414,219 @@ static void do_test_static_group_rebalance(void) {
}


static void do_test_static_group_rebalance_consumer(void) {
rd_kafka_conf_t *conf;
test_msgver_t mv;
int64_t rebalance_start;
_consumer_t c[_CONSUMER_CNT] = RD_ZERO_INIT;
const int msgcnt = 100;
uint64_t testid = test_id_generate();
const char *topic =
test_mk_topic_name("0102_static_group_rebalance", 1);
char *topics = rd_strdup(tsprintf("^%s.*", topic));

SUB_TEST();

test_conf_init(&conf, NULL, 70);
test_msgver_init(&mv, testid);
c[0].mv = &mv;
c[1].mv = &mv;

test_create_topic_wait_exists(NULL, topic, 3, 1, 5000);
test_produce_msgs_easy(topic, testid, RD_KAFKA_PARTITION_UA, msgcnt);

test_conf_set(conf, "max.poll.interval.ms", "9000");
test_conf_set(conf, "auto.offset.reset", "earliest");
test_conf_set(conf, "topic.metadata.refresh.interval.ms", "500");
test_conf_set(conf, "metadata.max.age.ms", "5000");
test_conf_set(conf, "enable.partition.eof", "true");
test_conf_set(conf, "group.instance.id", "consumer1");
test_conf_set(conf, "partition.assignment.strategy",
"cooperative-sticky");

rd_kafka_conf_set_opaque(conf, &c[0]);
c[0].rk = test_create_consumer(topic, rebalance_cb,
rd_kafka_conf_dup(conf), NULL);

rd_kafka_conf_set_opaque(conf, &c[1]);
test_conf_set(conf, "group.instance.id", "consumer2");
c[1].rk = test_create_consumer(topic, rebalance_cb,
rd_kafka_conf_dup(conf), NULL);
rd_kafka_conf_destroy(conf);

test_wait_topic_exists(c[1].rk, topic, 5000);

test_consumer_subscribe(c[0].rk, topics);
test_consumer_subscribe(c[1].rk, topics);

/*
* Static members enforce `max.poll.interval.ms` which may prompt
* an unwanted rebalance while the other consumer awaits its assignment.
* These members remain in the member list however so we must
* interleave calls to poll while awaiting our assignment to avoid
* unexpected rebalances being triggered.
*/
rebalance_start = test_clock();
c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
while (!static_member_wait_rebalance(&c[0], rebalance_start,
&c[0].assigned_at, 1000)) {
/* keep consumer 2 alive while consumer 1 awaits
* its assignment
*/
c[1].curr_line = __LINE__;
test_consumer_poll_once(c[1].rk, &mv, 0);
}

static_member_expect_rebalance(&c[1], rebalance_start,
&c[1].assigned_at, -1);

/*
* Consume all the messages so we can watch for duplicates
* after rejoin/rebalance operations.
*/
c[0].curr_line = __LINE__;
test_consumer_poll("serve.queue", c[0].rk, testid, c[0].partition_cnt,
0, -1, &mv);
c[1].curr_line = __LINE__;
test_consumer_poll("serve.queue", c[1].rk, testid, c[1].partition_cnt,
0, -1, &mv);

test_msgver_verify("first.verify", &mv, TEST_MSGVER_ALL, 0, msgcnt);

TEST_SAY("== Testing consumer restart ==\n");
conf = rd_kafka_conf_dup(rd_kafka_conf(c[1].rk));

/* Only c[1] should exhibit rebalance behavior */
c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
test_consumer_close(c[1].rk);
rd_kafka_destroy(c[1].rk);

c[1].rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
rd_kafka_poll_set_consumer(c[1].rk);

test_consumer_subscribe(c[1].rk, topics);

/* Await assignment */
c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
rebalance_start = test_clock();
while (!static_member_wait_rebalance(&c[1], rebalance_start,
&c[1].assigned_at, 1000)) {
c[0].curr_line = __LINE__;
test_consumer_poll_once(c[0].rk, &mv, 0);
}


TEST_SAY("== Testing subscription expansion ==\n");

/*
* New topics matching the subscription pattern should cause
* group rebalance
*/
test_create_topic_wait_exists(c->rk, tsprintf("%snew", topic), 1, 1,
5000);

rebalance_start = test_clock();
/* Await assignment */
// The partition can be assigned to any of the consumers.
c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
int wait_c0_rebalance = 1, wait_c1_rebalance = 1;
while (wait_c0_rebalance && wait_c1_rebalance) {
int c0_rebalance = 0, c1_rebalance = 0;
if (wait_c0_rebalance) {
c0_rebalance = static_member_wait_rebalance(
&c[0], rebalance_start, &c[0].assigned_at, 1000);
} else {
c[0].curr_line = __LINE__;
test_consumer_poll_once(c[0].rk, &mv, 0);
}

if (wait_c1_rebalance) {
c1_rebalance = static_member_wait_rebalance(
&c[1], rebalance_start, &c[1].assigned_at, 1000);
} else {
c[1].curr_line = __LINE__;
test_consumer_poll_once(c[1].rk, &mv, 0);
}

wait_c0_rebalance = wait_c0_rebalance && !c0_rebalance;
wait_c1_rebalance = wait_c1_rebalance && !c1_rebalance;
}

TEST_SAY("== Testing consumer unsubscribe ==\n");

/* Unsubscribe should send a LeaveGroupRequest invoking a rebalance */

/* Send LeaveGroup incrementing generation by 1 */
rebalance_start = test_clock();
rd_kafka_unsubscribe(c[1].rk);

/* Await revocation */
c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
static_member_expect_rebalance(&c[1], rebalance_start, &c[1].revoked_at,
-1);

/* Send JoinGroup bumping generation by 1 */
rebalance_start = test_clock();
test_consumer_subscribe(c[1].rk, topics);

c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
static_member_expect_rebalance(&c[1], rebalance_start,
&c[1].assigned_at, -1);

TEST_SAY("== Testing max poll violation ==\n");
/* max.poll.interval.ms should still be enforced by the consumer */

/*
* Block long enough for consumer 2 to be evicted from the group
* `max.poll.interval.ms` + `session.timeout.ms`
*/

rebalance_start = test_clock();
c[0].curr_line = __LINE__;
test_consumer_poll_no_msgs("wait.max.poll", c[0].rk, testid,
1000 + 9000 /* max.poll.interval.ms*/);

c[1].curr_line = __LINE__;
test_consumer_poll_expect_err(c[1].rk, testid, 1000,
RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED);

// Member is fenced due to max.poll.interval.ms exceeded and hence the
// partition is revoked
c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
c[1].curr_line = __LINE__;
static_member_expect_rebalance(&c[1], rebalance_start, &c[1].revoked_at,
-1);

// The member tries to rejoin once it is fenced due to
// max.poll.interval.ms exceeded
c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
c[1].curr_line = __LINE__;
static_member_expect_rebalance(&c[1], rebalance_start,
&c[1].assigned_at, -1);


// Session timeout is not local to the client in `consumer` group
// protocol.
c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
c[0].curr_line = __LINE__;
test_consumer_close(c[0].rk);
rd_kafka_destroy(c[0].rk);

c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
c[1].curr_line = __LINE__;
test_consumer_close(c[1].rk);
rd_kafka_destroy(c[1].rk);

test_msgver_verify("final.validation", &mv, TEST_MSGVER_ALL, 0, msgcnt);
test_msgver_clear(&mv);
free(topics);

SUB_TEST_PASS();
}


/**
* @brief Await a non-empty assignment for all consumers in \p c
*/
Expand Down Expand Up @@ -783,15 +997,11 @@ static void do_test_static_membership_mock(int variation) {
}

int main_0102_static_group_rebalance(int argc, char **argv) {
/* TODO: check again when regexes
* will be supported by KIP-848 */
if (test_consumer_group_protocol_classic()) {
do_test_static_group_rebalance();
}

if (test_consumer_group_protocol_classic()) {
do_test_static_group_rebalance_classic();
do_test_fenced_member_classic();
} else {
do_test_static_group_rebalance_consumer();
do_test_fenced_member_consumer();
}

Expand Down