Я экспериментирую на Kafka с Spring Boot.
- Spring Boot 2.1.0.RELEASE
- Spring-Kafka 2.2.0
MyKafkaConfig для потребителей выглядит следующим образом:
@Bean
ThreadPoolTaskExecutor messageProcessorExecutor() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
exec.setMaxPoolSize(20);
exec.setKeepAliveSeconds(30);
exec.setThreadNamePrefix("kafkaConsumer-");
return exec;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs());
JsonDeserializer<String> valueDeserializer = new JsonDeserializer<>();
valueDeserializer.addTrustedPackages("path.to.my.pkgs");
consumerFactory.setValueDeserializer(valueDeserializer);
consumerFactory.setKeyDeserializer(new StringDeserializer());
return consumerFactory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(4000);
factory.getContainerProperties().setConsumerTaskExecutor(messageProcessorExecutor());
return factory;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
return props;
}
И у меня есть один потребитель.
@KafkaListener(topics = "Topic1", groupId = "groupId")
public void consume(MyMessage message) {
logger.info("Message is read.);
}
Как вы видите выше, я настроил параллелизм на 10. Он работает втак, как я хочу.Когда я помещаю 3 сообщения в связанную тему в Kafka, я вижу, что каждое сообщение используется разными потоками.
2018-12-12 23:41:50.416 INFO 1937 --- [kafkaConsumer-8] o.e.kafkalistener.KafkaListeners : Message is read.
2018-12-12 23:41:50.414 INFO 1937 --- [kafkaConsumer-2] o.e.kafkalistener.KafkaListeners : Message is read.
2018-12-12 23:41:50.461 INFO 1937 --- [kafkaConsumer-6] o.e.kafkalistener.KafkaListeners : Message is read.
Однако потребитель работает с одним потоком при запуске приложения.Мой тестовый пример:
- закрытие приложения
- отправка еще 3 сообщений на ту же тему Kafka
- запуск приложения-потребителя
IЯ смотрю журналы, как указано выше:
2018-12-12 23:51:51.525 INFO 2023 --- [kafkaConsumer-1] o.e.kafkalistener.KafkaListeners : Message is read.
2018-12-12 23:51:51.526 INFO 2023 --- [kafkaConsumer-1] o.e.kafkalistener.KafkaListeners : Message is read.
2018-12-12 23:51:51.526 INFO 2023 --- [kafkaConsumer-1] o.e.kafkalistener.KafkaListeners : Message is read.
2018-12-12 23:51:54.104 INFO 2023 --- [kafkaConsumer-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=groupId] Attempt to heartbeat failed since group is rebalancing
2018-12-12 23:51:54.139 INFO 2023 --- [kafkaConsumer-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=groupId] Revoking previously assigned partitions [I have deleted here to make log more readable]
2018-12-12 23:51:54.139 INFO 2023 --- [kafkaConsumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [I have deleted here to make log more readable]
2018-12-12 23:51:54.139 INFO 2023 --- [kafkaConsumer-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=groupId] (Re-)joining group
2018-12-12 23:51:54.155 INFO 2023 --- [kafkaConsumer-9] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.155 INFO 2023 --- [kafkaConsumer-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.156 INFO 2023 --- [kafkaConsumer-9] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, groupId=groupId] Setting newly assigned partitions [Topic1-0, Topic1-1, Topic1-2, Topic1-3, Topic1-12, Topic1-13, Topic1-14, Topic1-15, Topic1-16, Topic1-17, Topic1-18, Topic1-19, Topic1-4, Topic1-5, Topic1-6, Topic1-7, Topic1-8, Topic1-9, Topic1-10, Topic1-11]
2018-12-12 23:51:54.156 INFO 2023 --- [kafkaConsumer-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=groupId] Setting newly assigned partitions [Topic1-60, Topic1-61, Topic1-62, Topic1-63, Topic1-64, Topic1-65, Topic1-66, Topic1-67, Topic1-76, Topic1-77, Topic1-78, Topic1-79, Topic1-68, Topic1-69, Topic1-70, Topic1-71, Topic1-72, Topic1-73, Topic1-74, Topic1-75]
2018-12-12 23:51:54.156 INFO 2023 --- [kafkaConsumer-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-5, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.157 INFO 2023 --- [kafkaConsumer-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-5, groupId=groupId] Setting newly assigned partitions [Topic1-116, Topic1-117, Topic1-118, Topic1-119, Topic1-108, Topic1-109, Topic1-110, Topic1-111, Topic1-112, Topic1-113, Topic1-114, Topic1-115, Topic1-100, Topic1-101, Topic1-102, Topic1-103, Topic1-104, Topic1-105, Topic1-106, Topic1-107]
2018-12-12 23:51:54.157 INFO 2023 --- [kafkaConsumer-6] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-7, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.157 INFO 2023 --- [kafkaConsumer-6] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-7, groupId=groupId] Setting newly assigned partitions [Topic1-156, Topic1-157, Topic1-158, Topic1-159, Topic1-148, Topic1-149, Topic1-150, Topic1-151, Topic1-152, Topic1-153, Topic1-154, Topic1-155, Topic1-140, Topic1-141, Topic1-142, Topic1-143, Topic1-144, Topic1-145, Topic1-146, Topic1-147]
2018-12-12 23:51:54.157 INFO 2023 --- [kafkaConsumer-7] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-8, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.158 INFO 2023 --- [kafkaConsumer-7] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-8, groupId=groupId] Setting newly assigned partitions [Topic1-160, Topic1-161, Topic1-162, Topic1-163, Topic1-172, Topic1-173, Topic1-174, Topic1-175, Topic1-176, Topic1-177, Topic1-178, Topic1-179, Topic1-164, Topic1-165, Topic1-166, Topic1-167, Topic1-168, Topic1-169, Topic1-170, Topic1-171]
2018-12-12 23:51:54.158 INFO 2023 --- [kafkaConsumer-5] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-6, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.158 INFO 2023 --- [kafkaConsumer-5] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-6, groupId=groupId] Setting newly assigned partitions [Topic1-124, Topic1-125, Topic1-126, Topic1-127, Topic1-128, Topic1-129, Topic1-130, Topic1-131, Topic1-120, Topic1-121, Topic1-122, Topic1-123, Topic1-132, Topic1-133, Topic1-134, Topic1-135, Topic1-136, Topic1-137, Topic1-138, Topic1-139]
2018-12-12 23:51:54.158 INFO 2023 --- [afkaConsumer-10] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.159 INFO 2023 --- [kafkaConsumer-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.159 INFO 2023 --- [afkaConsumer-10] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11, groupId=groupId] Setting newly assigned partitions [Topic1-28, Topic1-29, Topic1-30, Topic1-31, Topic1-32, Topic1-33, Topic1-34, Topic1-35, Topic1-20, Topic1-21, Topic1-22, Topic1-23, Topic1-24, Topic1-25, Topic1-26, Topic1-27, Topic1-36, Topic1-37, Topic1-38, Topic1-39]
2018-12-12 23:51:54.159 INFO 2023 --- [kafkaConsumer-8] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.159 INFO 2023 --- [kafkaConsumer-8] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-9, groupId=groupId] Setting newly assigned partitions [Topic1-188, Topic1-189, Topic1-190, Topic1-191, Topic1-192, Topic1-193, Topic1-194, Topic1-195, Topic1-180, Topic1-181, Topic1-182, Topic1-183, Topic1-184, Topic1-185, Topic1-186, Topic1-187, Topic1-196, Topic1-197, Topic1-198, Topic1-199]
2018-12-12 23:51:54.163 INFO 2023 --- [kafkaConsumer-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.165 INFO 2023 --- [kafkaConsumer-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=groupId] Setting newly assigned partitions [Topic1-92, Topic1-93, Topic1-94, Topic1-95, Topic1-96, Topic1-97, Topic1-98, Topic1-99, Topic1-84, Topic1-85, Topic1-86, Topic1-87, Topic1-88, Topic1-89, Topic1-90, Topic1-91, Topic1-80, Topic1-81, Topic1-82, Topic1-83]
2018-12-12 23:51:54.189 INFO 2023 --- [kafkaConsumer-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=groupId] Setting newly assigned partitions [Topic1-52, Topic1-53, Topic1-54, Topic1-55, Topic1-56, Topic1-57, Topic1-58, Topic1-59, Topic1-44, Topic1-45, Topic1-46, Topic1-47, Topic1-48, Topic1-49, Topic1-50, Topic1-51, Topic1-40, Topic1-41, Topic1-42, Topic1-43]
2018-12-12 23:51:54.192 INFO 2023 --- [kafkaConsumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [Topic1-52, Topic1-53, Topic1-54, Topic1-55, Topic1-56, Topic1-57, Topic1-58, Topic1-59, Topic1-44, Topic1-45, Topic1-46, Topic1-47, Topic1-48, Topic1-49, Topic1-50, Topic1-51, Topic1-40, Topic1-41, Topic1-42, Topic1-43]
2018-12-12 23:51:54.278 INFO 2023 --- [kafkaConsumer-2] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [Topic1-60, Topic1-61, Topic1-62, Topic1-63, Topic1-64, Topic1-65, Topic1-66, Topic1-67, Topic1-76, Topic1-77, Topic1-78, Topic1-79, Topic1-68, Topic1-69, Topic1-70, Topic1-71, Topic1-72, Topic1-73, Topic1-74, Topic1-75]
2018-12-12 23:51:54.278 INFO 2023 --- [kafkaConsumer-9] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [Topic1-0, Topic1-1, Topic1-2, Topic1-3, Topic1-12, Topic1-13, Topic1-14, Topic1-15, Topic1-16, Topic1-17, Topic1-18, Topic1-19, Topic1-4, Topic1-5, Topic1-6, Topic1-7, Topic1-8, Topic1-9, Topic1-10, Topic1-11]
2018-12-12 23:51:54.283 INFO 2023 --- [kafkaConsumer-4] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [Topic1-116, Topic1-117, Topic1-118, Topic1-119, Topic1-108, Topic1-109, Topic1-110, Topic1-111, Topic1-112, Topic1-113, Topic1-114, Topic1-115, Topic1-100, Topic1-101, Topic1-102, Topic1-103, Topic1-104, Topic1-105, Topic1-106, Topic1-107]
2018-12-12 23:51:54.283 INFO 2023 --- [kafkaConsumer-6] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [Topic1-156, Topic1-157, Topic1-158, Topic1-159, Topic1-148, Topic1-149, Topic1-150, Topic1-151, Topic1-152, Topic1-153, Topic1-154, Topic1-155, Topic1-140, Topic1-141, Topic1-142, Topic1-143, Topic1-144, Topic1-145, Topic1-146, Topic1-147]
2018-12-12 23:51:54.283 INFO 2023 --- [kafkaConsumer-8] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [Topic1-188, Topic1-189, Topic1-190, Topic1-191, Topic1-192, Topic1-193, Topic1-194, Topic1-195, Topic1-180, Topic1-181, Topic1-182, Topic1-183, Topic1-184, Topic1-185, Topic1-186, Topic1-187, Topic1-196, Topic1-197, Topic1-198, Topic1-199]
2018-12-12 23:51:54.283 INFO 2023 --- [kafkaConsumer-5] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [Topic1-124, Topic1-125, Topic1-126, Topic1-127, Topic1-128, Topic1-129, Topic1-130, Topic1-131, Topic1-120, Topic1-121, Topic1-122, Topic1-123, Topic1-132, Topic1-133, Topic1-134, Topic1-135, Topic1-136, Topic1-137, Topic1-138, Topic1-139]
2018-12-12 23:51:54.283 INFO 2023 --- [kafkaConsumer-7] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [Topic1-160, Topic1-161, Topic1-162, Topic1-163, Topic1-172, Topic1-173, Topic1-174, Topic1-175, Topic1-176, Topic1-177, Topic1-178, Topic1-179, Topic1-164, Topic1-165, Topic1-166, Topic1-167, Topic1-168, Topic1-169, Topic1-170, Topic1-171]
2018-12-12 23:51:54.283 INFO 2023 --- [afkaConsumer-10] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [Topic1-28, Topic1-29, Topic1-30, Topic1-31, Topic1-32, Topic1-33, Topic1-34, Topic1-35, Topic1-20, Topic1-21, Topic1-22, Topic1-23, Topic1-24, Topic1-25, Topic1-26, Topic1-27, Topic1-36, Topic1-37, Topic1-38, Topic1-39]
2018-12-12 23:51:54.283 INFO 2023 --- [kafkaConsumer-3] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [Topic1-92, Topic1-93, Topic1-94, Topic1-95, Topic1-96, Topic1-97, Topic1-98, Topic1-99, Topic1-84, Topic1-85, Topic1-86, Topic1-87, Topic1-88, Topic1-89, Topic1-90, Topic1-91, Topic1-80, Topic1-81, Topic1-82, Topic1-83]
Я думаю, 3 непрочитанных сообщения читаются до того, как все потребители готовы, и каждое из них читается потребителем с именем kafkaConsumer-1 .Эта ситуация не изменилась, когда я выдвинул гораздо больше сообщений, когда приложение-пользователь закрыто.
Как я могу одновременно читать все непрочитанные сообщения при запуске приложения?