Я экспериментирую на Kafka с Spring Boot.
- Spring Boot 2.1.0.RELEASE
- Spring-Kafka 2.2.0
Iиметь проблему.Если по теме есть непрочитанное сообщение, при запуске моего приложения всегда начинается перебалансировка потребителей.
У меня есть тема с 20 разделами, и у меня есть 1 KafkaListener с параллелизмом 8, и я настроил ThreadPoolTaskExecutor с:
@Bean
ThreadPoolTaskExecutor messageProcessorExecutor() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(15);
exec.setMaxPoolSize(25);
exec.setKeepAliveSeconds(60);
exec.setThreadNamePrefix("kafkaConsumer-");
return exec;
}
При запуске приложения,обычно только kafkaConsumer-1 (иногда kafkaConsumer-1 и kafkaConsumer-2) начинает потреблять, а для других 7 потребителей я вижу журналы, как показано ниже;
2018-12-15 16:44:27.982 INFO 5551 --- [payment-group-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] Attempt to heartbeat failed since group is rebalancing
2018-12-15 16:44:32.050 INFO 5551 --- [payment-group-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] Attempt to heartbeat failed since group is rebalancing
2018-12-15 16:44:36.114 INFO 5551 --- [payment-group-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] Attempt to heartbeat failed since group is rebalancing
....
2018-12-15 16:44:57.841 INFO 5551 --- [kafkaConsumer-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=payment-group-1] Group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
2018-12-15 16:44:57.902 INFO 5551 --- [kafkaConsumer-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=payment-group-1] Group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
2018-12-15 16:44:57.956 INFO 5551 --- [kafkaConsumer-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=payment-group-1] Discovered group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null)
2018-12-15 16:44:57.956 INFO 5551 --- [kafkaConsumer-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:44:57.999 INFO 5551 --- [kafkaConsumer-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-5, groupId=payment-group-1] Group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
2018-12-15 16:44:58.016 INFO 5551 --- [kafkaConsumer-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=payment-group-1] Discovered group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null)
2018-12-15 16:44:58.017 INFO 5551 --- [kafkaConsumer-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:44:58.056 INFO 5551 --- [kafkaConsumer-5] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-6, groupId=payment-group-1] Group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
2018-12-15 16:44:58.064 INFO 5551 --- [kafkaConsumer-6] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-7, groupId=payment-group-1] Group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
2018-12-15 16:44:58.072 INFO 5551 --- [kafkaConsumer-7] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-8, groupId=payment-group-1] Group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
2018-12-15 16:44:58.085 INFO 5551 --- [kafkaConsumer-8] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=payment-group-1] Group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
2018-12-15 16:44:58.114 INFO 5551 --- [kafkaConsumer-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-5, groupId=payment-group-1] Discovered group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null)
2018-12-15 16:44:58.114 INFO 5551 --- [kafkaConsumer-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-5, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:44:58.170 INFO 5551 --- [kafkaConsumer-5] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-6, groupId=payment-group-1] Discovered group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null)
2018-12-15 16:44:58.171 INFO 5551 --- [kafkaConsumer-5] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-6, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:44:58.185 INFO 5551 --- [kafkaConsumer-6] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-7, groupId=payment-group-1] Discovered group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null)
2018-12-15 16:44:58.186 INFO 5551 --- [kafkaConsumer-6] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-7, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:44:58.191 INFO 5551 --- [kafkaConsumer-7] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-8, groupId=payment-group-1] Discovered group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null)
2018-12-15 16:44:58.191 INFO 5551 --- [kafkaConsumer-7] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-8, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:44:58.201 INFO 5551 --- [kafkaConsumer-8] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=payment-group-1] Discovered group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null)
2018-12-15 16:44:58.202 INFO 5551 --- [kafkaConsumer-8] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=payment-group-1] (Re-)joining group
И пока я вижу потребление одним потоком, я вижуниже:
bash-4.4# $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group payment-group-1 --describe
Warning: Consumer group 'payment-group-1' is rebalancing.
И после того, как потребление существующих сообщений закончено, перебалансировка также завершается, и я вижу журналы, подобные ниже;
2018-12-15 16:46:12.415 INFO 5551 --- [kafkaConsumer-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] Revoking previously assigned partitions [TopicY-4, TopicY-5, TopicY-6, TopicY-7, TopicY-8, TopicY-9, TopicY-10, TopicY-11, TopicY-0, TopicY-1, TopicY-2, TopicY-3, TopicY-12, TopicY-13, TopicY-14, TopicY-15, TopicY-16, TopicY-17, TopicY-18, TopicY-19]
2018-12-15 16:46:12.415 INFO 5551 --- [kafkaConsumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [TopicY-4, TopicY-5, TopicY-6, TopicY-7, TopicY-8, TopicY-9, TopicY-10, TopicY-11, TopicY-0, TopicY-1, TopicY-2, TopicY-3, TopicY-12, TopicY-13, TopicY-14, TopicY-15, TopicY-16, TopicY-17, TopicY-18, TopicY-19]
2018-12-15 16:46:12.415 INFO 5551 --- [kafkaConsumer-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:46:12.482 INFO 5551 --- [kafkaConsumer-6] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-7, groupId=payment-group-1] Successfully joined group with generation 25
2018-12-15 16:46:12.482 INFO 5551 --- [kafkaConsumer-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=payment-group-1] Successfully joined group with generation 25
2018-12-15 16:46:12.482 INFO 5551 --- [kafkaConsumer-7] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-8, groupId=payment-group-1] Successfully joined group with generation 25
2018-12-15 16:46:12.483 INFO 5551 --- [kafkaConsumer-7] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-8, groupId=payment-group-1] Setting newly assigned partitions []
2018-12-15 16:46:12.483 INFO 5551 --- [kafkaConsumer-7] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: []
2018-12-15 16:46:12.484 INFO 5551 --- [kafkaConsumer-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-5, groupId=payment-group-1] Successfully joined group with generation 25
2018-12-15 16:46:12.484 INFO 5551 --- [kafkaConsumer-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-5, groupId=payment-group-1] Setting newly assigned partitions [TopicY-11]
2018-12-15 16:46:12.485 INFO 5551 --- [kafkaConsumer-8] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=payment-group-1] Successfully joined group with generation 25
2018-12-15 16:46:12.485 INFO 5551 --- [kafkaConsumer-8] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-9, groupId=payment-group-1] Setting newly assigned partitions []
2018-12-15 16:46:12.485 INFO 5551 --- [kafkaConsumer-8] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: []
2018-12-15 16:46:12.486 INFO 5551 --- [kafkaConsumer-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=payment-group-1] Successfully joined group with generation 25
2018-12-15 16:46:12.486 INFO 5551 --- [kafkaConsumer-5] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-6, groupId=payment-group-1] Successfully joined group with generation 25
2018-12-15 16:46:12.486 INFO 5551 --- [kafkaConsumer-5] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-6, groupId=payment-group-1] Setting newly assigned partitions [TopicY-15]
2018-12-15 16:46:12.486 INFO 5551 --- [kafkaConsumer-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=payment-group-1] Setting newly assigned partitions [TopicY-3]
2018-12-15 16:46:12.487 INFO 5551 --- [kafkaConsumer-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=payment-group-1] Setting newly assigned partitions [TopicY-5]
2018-12-15 16:46:12.489 INFO 5551 --- [kafkaConsumer-6] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-7, groupId=payment-group-1] Setting newly assigned partitions [TopicY-19]
2018-12-15 16:46:12.489 INFO 5551 --- [kafkaConsumer-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] Successfully joined group with generation 25
2018-12-15 16:46:12.497 INFO 5551 --- [kafkaConsumer-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] Setting newly assigned partitions [TopicY-0]
2018-12-15 16:46:12.509 INFO 5551 --- [kafkaConsumer-3] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-5]
2018-12-15 16:46:12.514 INFO 5551 --- [kafkaConsumer-6] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-19]
2018-12-15 16:46:12.517 INFO 5551 --- [kafkaConsumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-0]
2018-12-15 16:46:12.611 INFO 5551 --- [kafkaConsumer-2] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-3]
2018-12-15 16:46:12.612 INFO 5551 --- [kafkaConsumer-5] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-15]
2018-12-15 16:46:12.628 INFO 5551 --- [kafkaConsumer-4] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-11]
2018-12-15 16:47:44.563 INFO 5551 --- [kafkaConsumer-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-5, groupId=payment-group-1] Attempt to heartbeat failed since group is rebalancing
2018-12-15 16:47:44.564 INFO 5551 --- [kafkaConsumer-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-5, groupId=payment-group-1] Revoking previously assigned partitions [TopicY-11]
2018-12-15 16:47:44.564 INFO 5551 --- [kafkaConsumer-4] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [TopicY-11]
2018-12-15 16:47:44.564 INFO 5551 --- [kafkaConsumer-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-5, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:47:44.577 INFO 5551 --- [kafkaConsumer-5] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-6, groupId=payment-group-1] Attempt to heartbeat failed since group is rebalancing
2018-12-15 16:47:44.577 INFO 5551 --- [kafkaConsumer-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=payment-group-1] Attempt to heartbeat failed since group is rebalancing
2018-12-15 16:47:44.577 INFO 5551 --- [kafkaConsumer-5] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-6, groupId=payment-group-1] Revoking previously assigned partitions [TopicY-15]
2018-12-15 16:47:44.577 INFO 5551 --- [kafkaConsumer-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=payment-group-1] Revoking previously assigned partitions [TopicY-3]
2018-12-15 16:47:44.577 INFO 5551 --- [kafkaConsumer-5] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [TopicY-15]
2018-12-15 16:47:44.577 INFO 5551 --- [kafkaConsumer-2] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [TopicY-3]
2018-12-15 16:47:44.577 INFO 5551 --- [kafkaConsumer-5] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-6, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:47:44.577 INFO 5551 --- [kafkaConsumer-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:47:44.577 INFO 5551 --- [kafkaConsumer-7] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-8, groupId=payment-group-1] Attempt to heartbeat failed since group is rebalancing
2018-12-15 16:47:44.577 INFO 5551 --- [kafkaConsumer-7] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-8, groupId=payment-group-1] Revoking previously assigned partitions []
2018-12-15 16:47:44.578 INFO 5551 --- [kafkaConsumer-7] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: []
2018-12-15 16:47:44.578 INFO 5551 --- [kafkaConsumer-7] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-8, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:47:44.586 INFO 5551 --- [kafkaConsumer-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=payment-group-1] Attempt to heartbeat failed since group is rebalancing
2018-12-15 16:47:44.587 INFO 5551 --- [kafkaConsumer-6] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-7, groupId=payment-group-1] Attempt to heartbeat failed since group is rebalancing
2018-12-15 16:47:44.587 INFO 5551 --- [kafkaConsumer-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=payment-group-1] Revoking previously assigned partitions [TopicY-5]
2018-12-15 16:47:44.587 INFO 5551 --- [kafkaConsumer-6] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-7, groupId=payment-group-1] Revoking previously assigned partitions [TopicY-19]
2018-12-15 16:47:44.587 INFO 5551 --- [kafkaConsumer-3] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [TopicY-5]
2018-12-15 16:47:44.587 INFO 5551 --- [kafkaConsumer-6] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [TopicY-19]
2018-12-15 16:47:44.587 INFO 5551 --- [kafkaConsumer-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:47:44.587 INFO 5551 --- [kafkaConsumer-6] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-7, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:47:44.589 INFO 5551 --- [kafkaConsumer-8] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=payment-group-1] Attempt to heartbeat failed since group is rebalancing
2018-12-15 16:47:44.589 INFO 5551 --- [kafkaConsumer-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] Attempt to heartbeat failed since group is rebalancing
2018-12-15 16:47:44.589 INFO 5551 --- [kafkaConsumer-8] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-9, groupId=payment-group-1] Revoking previously assigned partitions []
2018-12-15 16:47:44.589 INFO 5551 --- [kafkaConsumer-8] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: []
2018-12-15 16:47:44.589 INFO 5551 --- [kafkaConsumer-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] Revoking previously assigned partitions [TopicY-0]
2018-12-15 16:47:44.589 INFO 5551 --- [kafkaConsumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [TopicY-0]
2018-12-15 16:47:44.589 INFO 5551 --- [kafkaConsumer-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:47:44.589 INFO 5551 --- [kafkaConsumer-8] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=payment-group-1] (Re-)joining group
2018-12-15 16:47:44.602 INFO 5551 --- [kafkaConsumer-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=payment-group-1] Successfully joined group with generation 26
2018-12-15 16:47:44.602 INFO 5551 --- [kafkaConsumer-6] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-7, groupId=payment-group-1] Successfully joined group with generation 26
2018-12-15 16:47:44.603 INFO 5551 --- [kafkaConsumer-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=payment-group-1] Setting newly assigned partitions [TopicY-6, TopicY-7, TopicY-8]
2018-12-15 16:47:44.603 INFO 5551 --- [kafkaConsumer-6] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-7, groupId=payment-group-1] Setting newly assigned partitions [TopicY-14, TopicY-15]
2018-12-15 16:47:44.603 INFO 5551 --- [kafkaConsumer-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] Successfully joined group with generation 26
2018-12-15 16:47:44.604 INFO 5551 --- [kafkaConsumer-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] Setting newly assigned partitions [TopicY-0, TopicY-1, TopicY-2]
2018-12-15 16:47:44.604 INFO 5551 --- [kafkaConsumer-7] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-8, groupId=payment-group-1] Successfully joined group with generation 26
2018-12-15 16:47:44.605 INFO 5551 --- [kafkaConsumer-7] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-8, groupId=payment-group-1] Setting newly assigned partitions [TopicY-16, TopicY-17]
2018-12-15 16:47:44.606 INFO 5551 --- [kafkaConsumer-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=payment-group-1] Successfully joined group with generation 26
2018-12-15 16:47:44.606 INFO 5551 --- [kafkaConsumer-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=payment-group-1] Setting newly assigned partitions [TopicY-4, TopicY-5, TopicY-3]
2018-12-15 16:47:44.610 INFO 5551 --- [kafkaConsumer-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-5, groupId=payment-group-1] Successfully joined group with generation 26
2018-12-15 16:47:44.610 INFO 5551 --- [kafkaConsumer-5] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-6, groupId=payment-group-1] Successfully joined group with generation 26
2018-12-15 16:47:44.611 INFO 5551 --- [kafkaConsumer-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-5, groupId=payment-group-1] Setting newly assigned partitions [TopicY-9, TopicY-10, TopicY-11]
2018-12-15 16:47:44.611 INFO 5551 --- [kafkaConsumer-5] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-6, groupId=payment-group-1] Setting newly assigned partitions [TopicY-12, TopicY-13]
2018-12-15 16:47:44.612 INFO 5551 --- [kafkaConsumer-8] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=payment-group-1] Successfully joined group with generation 26
2018-12-15 16:47:44.612 INFO 5551 --- [kafkaConsumer-8] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-9, groupId=payment-group-1] Setting newly assigned partitions [TopicY-18, TopicY-19]
2018-12-15 16:47:44.612 INFO 5551 --- [kafkaConsumer-6] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-14, TopicY-15]
2018-12-15 16:47:44.614 INFO 5551 --- [kafkaConsumer-3] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-6, TopicY-7, TopicY-8]
2018-12-15 16:47:44.618 INFO 5551 --- [kafkaConsumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-0, TopicY-1, TopicY-2]
2018-12-15 16:47:44.623 INFO 5551 --- [kafkaConsumer-7] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-16, TopicY-17]
2018-12-15 16:47:44.625 INFO 5551 --- [kafkaConsumer-2] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-4, TopicY-5, TopicY-3]
2018-12-15 16:47:44.626 INFO 5551 --- [kafkaConsumer-8] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-18, TopicY-19]
2018-12-15 16:47:44.628 INFO 5551 --- [kafkaConsumer-5] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-12, TopicY-13]
2018-12-15 16:47:44.877 INFO 5551 --- [kafkaConsumer-4] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-9, TopicY-10, TopicY-11]
Я хочу использовать Kafka в приложениис минимумом 100 тпс.Но я испытываю трудности с моими экспериментами с упомянутым выше тестом.Перебалансировка занимает слишком много времени, и обычно я потребляю все сообщения в одном потоке.Я не могу использовать все сообщения с одним потоком на производстве.
Примечания:
- Я пишу пользователям с помощью @ KafkaListener аннотация.
- Я перепробовал множество пользовательских конфигов.
Моя текущая конфигурация:
ENABLE_AUTO_COMMIT_CONFIG=false
SESSION_TIMEOUT_MS_CONFIG=90000
HEARTBEAT_INTERVAL_MS_CONFIG=4000
AckMode=AckMode.RECORD
PollTimeout=10000
Я прошу любой совет, чтобы решить мою проблему.
Кстати, если по теме нет непрочитанного сообщения, перебалансировка не происходит, и я сразу вижу логи, как показано ниже;
2018-12-15 17:12:01.618 INFO 5559 --- [kafkaConsumer-8] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=payment-group-1] (Re-)joining group
2018-12-15 17:12:01.676 INFO 5559 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8081 (http) with context path ''
2018-12-15 17:12:01.684 INFO 5559 --- [ main] o.e.k.KafkaListenerApplication : Started KafkaListenerApplication in 7.099 seconds (JVM running for 9.026)
2018-12-15 17:12:05.363 INFO 5559 --- [kafkaConsumer-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] Attempt to heartbeat failed since group is rebalancing
2018-12-15 17:12:05.364 INFO 5559 --- [kafkaConsumer-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] Revoking previously assigned partitions [TopicY-4, TopicY-5, TopicY-6, TopicY-7, TopicY-8, TopicY-9, TopicY-10, TopicY-11, TopicY-0, TopicY-1, TopicY-2, TopicY-3, TopicY-12, TopicY-13, TopicY-14, TopicY-15, TopicY-16, TopicY-17, TopicY-18, TopicY-19]
2018-12-15 17:12:05.364 INFO 5559 --- [kafkaConsumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [TopicY-4, TopicY-5, TopicY-6, TopicY-7, TopicY-8, TopicY-9, TopicY-10, TopicY-11, TopicY-0, TopicY-1, TopicY-2, TopicY-3, TopicY-12, TopicY-13, TopicY-14, TopicY-15, TopicY-16, TopicY-17, TopicY-18, TopicY-19]
2018-12-15 17:12:05.364 INFO 5559 --- [kafkaConsumer-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] (Re-)joining group
2018-12-15 17:12:05.382 INFO 5559 --- [kafkaConsumer-5] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-6, groupId=payment-group-1] Successfully joined group with generation 29
2018-12-15 17:12:05.382 INFO 5559 --- [kafkaConsumer-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] Successfully joined group with generation 29
2018-12-15 17:12:05.382 INFO 5559 --- [kafkaConsumer-5] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-6, groupId=payment-group-1] Setting newly assigned partitions [TopicY-12, TopicY-13]
2018-12-15 17:12:05.384 INFO 5559 --- [kafkaConsumer-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=payment-group-1] Successfully joined group with generation 29
2018-12-15 17:12:05.384 INFO 5559 --- [kafkaConsumer-7] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-8, groupId=payment-group-1] Successfully joined group with generation 29
2018-12-15 17:12:05.384 INFO 5559 --- [kafkaConsumer-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=payment-group-1] Setting newly assigned partitions [TopicY-4, TopicY-5, TopicY-3]
2018-12-15 17:12:05.384 INFO 5559 --- [kafkaConsumer-7] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-8, groupId=payment-group-1] Setting newly assigned partitions [TopicY-16, TopicY-17]
2018-12-15 17:12:05.385 INFO 5559 --- [kafkaConsumer-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-5, groupId=payment-group-1] Successfully joined group with generation 29
2018-12-15 17:12:05.385 INFO 5559 --- [kafkaConsumer-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=payment-group-1] Successfully joined group with generation 29
2018-12-15 17:12:05.385 INFO 5559 --- [kafkaConsumer-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-5, groupId=payment-group-1] Setting newly assigned partitions [TopicY-9, TopicY-10, TopicY-11]
2018-12-15 17:12:05.385 INFO 5559 --- [kafkaConsumer-8] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=payment-group-1] Successfully joined group with generation 29
2018-12-15 17:12:05.385 INFO 5559 --- [kafkaConsumer-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=payment-group-1] Setting newly assigned partitions [TopicY-6, TopicY-7, TopicY-8]
2018-12-15 17:12:05.385 INFO 5559 --- [kafkaConsumer-8] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-9, groupId=payment-group-1] Setting newly assigned partitions [TopicY-18, TopicY-19]
2018-12-15 17:12:05.385 INFO 5559 --- [kafkaConsumer-6] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-7, groupId=payment-group-1] Successfully joined group with generation 29
2018-12-15 17:12:05.386 INFO 5559 --- [kafkaConsumer-6] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-7, groupId=payment-group-1] Setting newly assigned partitions [TopicY-14, TopicY-15]
2018-12-15 17:12:05.389 INFO 5559 --- [kafkaConsumer-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=payment-group-1] Setting newly assigned partitions [TopicY-0, TopicY-1, TopicY-2]
2018-12-15 17:12:05.400 INFO 5559 --- [kafkaConsumer-4] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-9, TopicY-10, TopicY-11]
2018-12-15 17:12:05.404 INFO 5559 --- [kafkaConsumer-8] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-18, TopicY-19]
2018-12-15 17:12:05.404 INFO 5559 --- [kafkaConsumer-6] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-14, TopicY-15]
2018-12-15 17:12:05.410 INFO 5559 --- [kafkaConsumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-0, TopicY-1, TopicY-2]
2018-12-15 17:12:05.505 INFO 5559 --- [kafkaConsumer-7] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-16, TopicY-17]
2018-12-15 17:12:05.510 INFO 5559 --- [kafkaConsumer-3] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-6, TopicY-7, TopicY-8]
2018-12-15 17:12:05.516 INFO 5559 --- [kafkaConsumer-2] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-4, TopicY-5, TopicY-3]
2018-12-15 17:12:05.520 INFO 5559 --- [kafkaConsumer-5] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [TopicY-12, TopicY-13]