You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm not sure whether this is a bug in the library, an error of implementation on my part or simply intended behaviour.
Im building a containerized demo app for c++. one container produces 10 messages and then shuts down. The other container is always online and consumes indefinitely.
consider this consumer:
Properties consumer_properties({
{"bootstrap.servers", EnvConfig().kafka_config.brokers()},
{"group.id", EnvConfig().kafka_config.group_id()},
{"auto.offset.reset", EnvConfig().kafka_config.auto_offset_reset()}
});
KConsumer::KConsumer(Topic _topic){
KConsumer::topic = _topic;
};
void KConsumer::subscribe(std::function<void(std::string)> handle_message){
// Use Ctrl-C to terminate the program
signal(SIGINT, stopRunning); // NOLINT
//init consumer and assign partition
KafkaConsumer consumer(consumer_properties);
consumer.subscribe( {topic}, NullRebalanceCallback,std::chrono::milliseconds(30000));
// consume loop
while (running) {
// Poll messages from Kafka brokers
auto records = consumer.poll(std::chrono::milliseconds(100));
/*
FIXME: if there are no new records incoming for a while the consumer will be kicked off of the consumer group due to missing heartbeats.
Not sure if this is just a short-coming on the frameworks side or our implementation mistake.
However, the consumer will resume after a rebalance and maybe this is an intended behaviour?
*/
for (const auto& record: records) {
if (!record.error()) {
//TODO do this on debug/trace level
std::cout << "Got a new message..." << std::endl;
std::cout << " Topic : " << record.topic() << std::endl;
std::cout << " Partition: " << record.partition() << std::endl;
std::cout << " Offset : " << record.offset() << std::endl;
std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
std::cout << " Headers : " << toString(record.headers()) << std::endl;
std::cout << " Key [" << record.key().toString() << "]" << std::endl;
std::cout << " Value [" << record.value().toString() << "]" << std::endl;
handle_message(record.value().toString()); // evoke callback
} else {
std::cerr << record.toString() << std::endl;
}
}
//consumer.commitSync(); // auto.commit is enabled by default
}
}
now what will happen upon starting the producer and consumer. is that after the last message is consumed, the consumer will be kicked off of the consumer group after the heartbeat-timeout-intervall and a rebalancing is triggered.
container logs:
<...>
kafka-plot-interface-utp-sink-connector-1 | [2023-08-22 08:44:06.845688]NOTICE KafkaConsumer[2d773c79-c6f4ba27] re-balance event triggered[ASSIGN_PARTITIONS], cooperative[disabled], topic-partitions[test-0]
kafka-plot-interface-utp-sink-connector-1 | [2023-08-22 08:44:06.846237]NOTICE KafkaConsumer[2d773c79-c6f4ba27] subscribed, topics[test]
<...>
kafka-plot-interface-utp-sink-connector-1 | Got a new message...
kafka-plot-interface-utp-sink-connector-1 | Topic : test
kafka-plot-interface-utp-sink-connector-1 | Partition: 0
kafka-plot-interface-utp-sink-connector-1 | Offset : 75
kafka-plot-interface-utp-sink-connector-1 | Timestamp: CreateTime[2023-08-22 08:43:34.972]
kafka-plot-interface-utp-sink-connector-1 | Headers :
kafka-plot-interface-utp-sink-connector-1 | Key [[null]]
kafka-plot-interface-utp-sink-connector-1 | Value [Hello, World 10]
kafka-plot-interface-utp-sink-connector-1 | broadcasting message: Hello, World 10
kafka-plot-interface-utp-sink-connector-1 | message sent successfully
kafka-plot-interface-kafka-1 | [2023-08-22 08:44:51,793] INFO [GroupCoordinator 0]: Member ffb336d6-3f9b5c63-b26e474b-24ef-433e-84fb-a5e4e9468d26 in group utp_sink_connector has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
kafka-plot-interface-kafka-1 | [2023-08-22 08:44:51,794] INFO [GroupCoordinator 0]: Preparing to rebalance group utp_sink_connector in state PreparingRebalance with old generation 5 (__consumer_offsets-37) (reason: removing member ffb336d6-3f9b5c63-b26e474b-24ef-433e-84fb-a5e4e9468d26 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
kafka-plot-interface-utp-sink-connector-1 | [2023-08-22 08:44:51.851106]NOTICE KafkaConsumer[2d773c79-c6f4ba27] re-balance event triggered[REVOKE_PARTITIONS], cooperative[disabled], topic-partitions[test-0]
kafka-plot-interface-kafka-1 | [2023-08-22 08:44:51,852] INFO [GroupCoordinator 0]: Stabilized group utp_sink_connector generation 6 (__consumer_offsets-37) with 1 members (kafka.coordinator.group.GroupCoordinator)
kafka-plot-interface-kafka-1 | [2023-08-22 08:44:51,857] INFO [GroupCoordinator 0]: Assignment received from leader 2d773c79-c6f4ba27-00c8b2f2-89bf-45ab-9441-bbb2c0586bdf for group utp_sink_connector for generation 6. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
kafka-plot-interface-utp-sink-connector-1 | [2023-08-22 08:44:51.858916]NOTICE KafkaConsumer[2d773c79-c6f4ba27] re-balance event triggered[ASSIGN_PARTITIONS], cooperative[disabled], topic-partitions[test-0]
so TLDR:
I think consumers should send heartbeats in the background independed of their consumption loops. which they don't ?
The text was updated successfully, but these errors were encountered:
hi,
I'm not sure whether this is a bug in the library, an error of implementation on my part or simply intended behaviour.
Im building a containerized demo app for c++. one container produces 10 messages and then shuts down. The other container is always online and consumes indefinitely.
consider this consumer:
now what will happen upon starting the producer and consumer. is that after the last message is consumed, the consumer will be kicked off of the consumer group after the heartbeat-timeout-intervall and a rebalancing is triggered.
container logs:
so TLDR:
I think consumers should send heartbeats in the background independed of their consumption loops. which they don't ?
The text was updated successfully, but these errors were encountered: