Skip to content

Commit

Permalink
Control message leader epoch test
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Sep 20, 2023
1 parent 49f180a commit 5fc10ab
Showing 1 changed file with 84 additions and 0 deletions.
84 changes: 84 additions & 0 deletions tests/0103-transactions.c
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,89 @@ static void do_test_empty_txn(rd_bool_t send_offsets, rd_bool_t do_commit) {
SUB_TEST_PASS();
}


/**
* @brief A control message should increase stored offset and
* that stored offset should have correct leader epoch
* an be included in commit.
*/
static void do_test_txn_abort_control_message_leader_epoch(void) {
const char *topic = test_mk_topic_name(__FUNCTION__, 1);

rd_kafka_t *p, *c;
rd_kafka_conf_t *p_conf, *c_conf;
test_msgver_t mv;
int exp_msg_cnt = 0;
uint64_t testid = test_id_generate();
rd_kafka_topic_partition_list_t *offsets;
int r;

SUB_TEST_QUICK();

test_conf_init(&p_conf, NULL, 30);
c_conf = rd_kafka_conf_dup(p_conf);

test_conf_set(p_conf, "transactional.id", topic);
rd_kafka_conf_set_dr_msg_cb(p_conf, test_dr_msg_cb);
p = test_create_handle(RD_KAFKA_PRODUCER, p_conf);

test_create_topic(p, topic, 1, 3);

TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 5000));

TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));

/* Produce one message */
test_produce_msgs2(p, topic, testid, RD_KAFKA_PARTITION_UA, 0, 1, NULL,
0);

/* Abort the transaction */
TEST_CALL_ERROR__(rd_kafka_abort_transaction(p, -1));

/**
* Create consumer.
*/
test_conf_set(c_conf, "enable.auto.commit", "false");
test_conf_set(c_conf, "group.id", topic);
test_conf_set(c_conf, "enable.partition.eof", "true");
test_conf_set(c_conf, "auto.offset.reset", "earliest");
test_msgver_init(&mv, testid);
c = test_create_consumer(topic, NULL, c_conf, NULL);


test_consumer_subscribe(c, topic);
/* Expect 0 messages and 1 EOF */
r = test_consumer_poll("consume.nothing", c, testid,
/* exp_eof_cnt */ 1,
/* exp_msg_base */ 0, exp_msg_cnt, &mv);
test_msgver_clear(&mv);

TEST_ASSERT(r == exp_msg_cnt, "expected %d messages, got %d",
exp_msg_cnt, r);

/* Commits offset 2 (1 aborted message + 1 control message) */
TEST_CALL_ERR__(rd_kafka_commit(c, NULL, rd_false));

offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, topic, 0);
rd_kafka_committed(c, offsets, -1);

/* Committed offset must be 2 */
TEST_ASSERT(offsets->cnt == 1, "expected 1 partition, got %d",
offsets->cnt);
TEST_ASSERT(offsets->elems[0].offset == 2,
"expected offset 2, got %" PRId64,
offsets->elems[0].offset);

/* All done */
test_consumer_close(c);
rd_kafka_topic_partition_list_destroy(offsets);
rd_kafka_destroy(c);
rd_kafka_destroy(p);

SUB_TEST_PASS();
}

/**
* @returns the high watermark for the given partition.
*/
Expand Down Expand Up @@ -1219,6 +1302,7 @@ int main_0103_transactions(int argc, char **argv) {
do_test_empty_txn(rd_true /*send offsets*/, rd_true /*commit*/);
do_test_empty_txn(rd_true /*send offsets*/, rd_false /*abort*/);
do_test_wmark_isolation_level();
do_test_txn_abort_control_message_leader_epoch();
return 0;
}

Expand Down

0 comments on commit 5fc10ab

Please sign in to comment.