Потребительский параллелизм Kafka при запуске Spring Boot - PullRequest
0 голосов
/ 13 декабря 2018

Я экспериментирую на Kafka с Spring Boot.

  • Spring Boot 2.1.0.RELEASE
  • Spring-Kafka 2.2.0

MyKafkaConfig для потребителей выглядит следующим образом:

@Bean
ThreadPoolTaskExecutor messageProcessorExecutor() {
    ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
    exec.setCorePoolSize(10);
    exec.setMaxPoolSize(20);
    exec.setKeepAliveSeconds(30);
    exec.setThreadNamePrefix("kafkaConsumer-");
    return exec;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs());

    JsonDeserializer<String> valueDeserializer = new JsonDeserializer<>();
    valueDeserializer.addTrustedPackages("path.to.my.pkgs");
    consumerFactory.setValueDeserializer(valueDeserializer);

    consumerFactory.setKeyDeserializer(new StringDeserializer());

    return consumerFactory;
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(10);
    factory.getContainerProperties().setPollTimeout(4000);
    factory.getContainerProperties().setConsumerTaskExecutor(messageProcessorExecutor());

    return factory;
}

private Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
    return props;
}

И у меня есть один потребитель.

@KafkaListener(topics = "Topic1", groupId = "groupId")
public void consume(MyMessage message) {
    logger.info("Message is read.);
}

Как вы видите выше, я настроил параллелизм на 10. Он работает втак, как я хочу.Когда я помещаю 3 сообщения в связанную тему в Kafka, я вижу, что каждое сообщение используется разными потоками.

2018-12-12 23:41:50.416  INFO 1937 --- [kafkaConsumer-8] o.e.kafkalistener.KafkaListeners         : Message is read.
2018-12-12 23:41:50.414  INFO 1937 --- [kafkaConsumer-2] o.e.kafkalistener.KafkaListeners         : Message is read.
2018-12-12 23:41:50.461  INFO 1937 --- [kafkaConsumer-6] o.e.kafkalistener.KafkaListeners         : Message is read.

Однако потребитель работает с одним потоком при запуске приложения.Мой тестовый пример:

  1. закрытие приложения
  2. отправка еще 3 сообщений на ту же тему Kafka
  3. запуск приложения-потребителя

IЯ смотрю журналы, как указано выше:

2018-12-12 23:51:51.525  INFO 2023 --- [kafkaConsumer-1] o.e.kafkalistener.KafkaListeners         : Message is read.
2018-12-12 23:51:51.526  INFO 2023 --- [kafkaConsumer-1] o.e.kafkalistener.KafkaListeners         : Message is read.
2018-12-12 23:51:51.526  INFO 2023 --- [kafkaConsumer-1] o.e.kafkalistener.KafkaListeners         : Message is read.
2018-12-12 23:51:54.104  INFO 2023 --- [kafkaConsumer-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] Attempt to heartbeat failed since group is rebalancing
2018-12-12 23:51:54.139  INFO 2023 --- [kafkaConsumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] Revoking previously assigned partitions [I have deleted here to make log more readable]
2018-12-12 23:51:54.139  INFO 2023 --- [kafkaConsumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [I have deleted here to make log more readable]
2018-12-12 23:51:54.139  INFO 2023 --- [kafkaConsumer-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] (Re-)joining group
2018-12-12 23:51:54.155  INFO 2023 --- [kafkaConsumer-9] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.155  INFO 2023 --- [kafkaConsumer-2] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.156  INFO 2023 --- [kafkaConsumer-9] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10, groupId=groupId] Setting newly assigned partitions [Topic1-0, Topic1-1, Topic1-2, Topic1-3, Topic1-12, Topic1-13, Topic1-14, Topic1-15, Topic1-16, Topic1-17, Topic1-18, Topic1-19, Topic1-4, Topic1-5, Topic1-6, Topic1-7, Topic1-8, Topic1-9, Topic1-10, Topic1-11]
2018-12-12 23:51:54.156  INFO 2023 --- [kafkaConsumer-2] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=groupId] Setting newly assigned partitions [Topic1-60, Topic1-61, Topic1-62, Topic1-63, Topic1-64, Topic1-65, Topic1-66, Topic1-67, Topic1-76, Topic1-77, Topic1-78, Topic1-79, Topic1-68, Topic1-69, Topic1-70, Topic1-71, Topic1-72, Topic1-73, Topic1-74, Topic1-75]
2018-12-12 23:51:54.156  INFO 2023 --- [kafkaConsumer-4] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-5, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.157  INFO 2023 --- [kafkaConsumer-4] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-5, groupId=groupId] Setting newly assigned partitions [Topic1-116, Topic1-117, Topic1-118, Topic1-119, Topic1-108, Topic1-109, Topic1-110, Topic1-111, Topic1-112, Topic1-113, Topic1-114, Topic1-115, Topic1-100, Topic1-101, Topic1-102, Topic1-103, Topic1-104, Topic1-105, Topic1-106, Topic1-107]
2018-12-12 23:51:54.157  INFO 2023 --- [kafkaConsumer-6] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-7, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.157  INFO 2023 --- [kafkaConsumer-6] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-7, groupId=groupId] Setting newly assigned partitions [Topic1-156, Topic1-157, Topic1-158, Topic1-159, Topic1-148, Topic1-149, Topic1-150, Topic1-151, Topic1-152, Topic1-153, Topic1-154, Topic1-155, Topic1-140, Topic1-141, Topic1-142, Topic1-143, Topic1-144, Topic1-145, Topic1-146, Topic1-147]
2018-12-12 23:51:54.157  INFO 2023 --- [kafkaConsumer-7] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-8, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.158  INFO 2023 --- [kafkaConsumer-7] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-8, groupId=groupId] Setting newly assigned partitions [Topic1-160, Topic1-161, Topic1-162, Topic1-163, Topic1-172, Topic1-173, Topic1-174, Topic1-175, Topic1-176, Topic1-177, Topic1-178, Topic1-179, Topic1-164, Topic1-165, Topic1-166, Topic1-167, Topic1-168, Topic1-169, Topic1-170, Topic1-171]
2018-12-12 23:51:54.158  INFO 2023 --- [kafkaConsumer-5] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-6, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.158  INFO 2023 --- [kafkaConsumer-5] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-6, groupId=groupId] Setting newly assigned partitions [Topic1-124, Topic1-125, Topic1-126, Topic1-127, Topic1-128, Topic1-129, Topic1-130, Topic1-131, Topic1-120, Topic1-121, Topic1-122, Topic1-123, Topic1-132, Topic1-133, Topic1-134, Topic1-135, Topic1-136, Topic1-137, Topic1-138, Topic1-139]
2018-12-12 23:51:54.158  INFO 2023 --- [afkaConsumer-10] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.159  INFO 2023 --- [kafkaConsumer-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.159  INFO 2023 --- [afkaConsumer-10] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11, groupId=groupId] Setting newly assigned partitions [Topic1-28, Topic1-29, Topic1-30, Topic1-31, Topic1-32, Topic1-33, Topic1-34, Topic1-35, Topic1-20, Topic1-21, Topic1-22, Topic1-23, Topic1-24, Topic1-25, Topic1-26, Topic1-27, Topic1-36, Topic1-37, Topic1-38, Topic1-39]
2018-12-12 23:51:54.159  INFO 2023 --- [kafkaConsumer-8] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-9, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.159  INFO 2023 --- [kafkaConsumer-8] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-9, groupId=groupId] Setting newly assigned partitions [Topic1-188, Topic1-189, Topic1-190, Topic1-191, Topic1-192, Topic1-193, Topic1-194, Topic1-195, Topic1-180, Topic1-181, Topic1-182, Topic1-183, Topic1-184, Topic1-185, Topic1-186, Topic1-187, Topic1-196, Topic1-197, Topic1-198, Topic1-199]
2018-12-12 23:51:54.163  INFO 2023 --- [kafkaConsumer-3] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=groupId] Successfully joined group with generation 41
2018-12-12 23:51:54.165  INFO 2023 --- [kafkaConsumer-3] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=groupId] Setting newly assigned partitions [Topic1-92, Topic1-93, Topic1-94, Topic1-95, Topic1-96, Topic1-97, Topic1-98, Topic1-99, Topic1-84, Topic1-85, Topic1-86, Topic1-87, Topic1-88, Topic1-89, Topic1-90, Topic1-91, Topic1-80, Topic1-81, Topic1-82, Topic1-83]
2018-12-12 23:51:54.189  INFO 2023 --- [kafkaConsumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] Setting newly assigned partitions [Topic1-52, Topic1-53, Topic1-54, Topic1-55, Topic1-56, Topic1-57, Topic1-58, Topic1-59, Topic1-44, Topic1-45, Topic1-46, Topic1-47, Topic1-48, Topic1-49, Topic1-50, Topic1-51, Topic1-40, Topic1-41, Topic1-42, Topic1-43]
2018-12-12 23:51:54.192  INFO 2023 --- [kafkaConsumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [Topic1-52, Topic1-53, Topic1-54, Topic1-55, Topic1-56, Topic1-57, Topic1-58, Topic1-59, Topic1-44, Topic1-45, Topic1-46, Topic1-47, Topic1-48, Topic1-49, Topic1-50, Topic1-51, Topic1-40, Topic1-41, Topic1-42, Topic1-43]
2018-12-12 23:51:54.278  INFO 2023 --- [kafkaConsumer-2] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [Topic1-60, Topic1-61, Topic1-62, Topic1-63, Topic1-64, Topic1-65, Topic1-66, Topic1-67, Topic1-76, Topic1-77, Topic1-78, Topic1-79, Topic1-68, Topic1-69, Topic1-70, Topic1-71, Topic1-72, Topic1-73, Topic1-74, Topic1-75]
2018-12-12 23:51:54.278  INFO 2023 --- [kafkaConsumer-9] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [Topic1-0, Topic1-1, Topic1-2, Topic1-3, Topic1-12, Topic1-13, Topic1-14, Topic1-15, Topic1-16, Topic1-17, Topic1-18, Topic1-19, Topic1-4, Topic1-5, Topic1-6, Topic1-7, Topic1-8, Topic1-9, Topic1-10, Topic1-11]
2018-12-12 23:51:54.283  INFO 2023 --- [kafkaConsumer-4] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [Topic1-116, Topic1-117, Topic1-118, Topic1-119, Topic1-108, Topic1-109, Topic1-110, Topic1-111, Topic1-112, Topic1-113, Topic1-114, Topic1-115, Topic1-100, Topic1-101, Topic1-102, Topic1-103, Topic1-104, Topic1-105, Topic1-106, Topic1-107]
2018-12-12 23:51:54.283  INFO 2023 --- [kafkaConsumer-6] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [Topic1-156, Topic1-157, Topic1-158, Topic1-159, Topic1-148, Topic1-149, Topic1-150, Topic1-151, Topic1-152, Topic1-153, Topic1-154, Topic1-155, Topic1-140, Topic1-141, Topic1-142, Topic1-143, Topic1-144, Topic1-145, Topic1-146, Topic1-147]
2018-12-12 23:51:54.283  INFO 2023 --- [kafkaConsumer-8] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [Topic1-188, Topic1-189, Topic1-190, Topic1-191, Topic1-192, Topic1-193, Topic1-194, Topic1-195, Topic1-180, Topic1-181, Topic1-182, Topic1-183, Topic1-184, Topic1-185, Topic1-186, Topic1-187, Topic1-196, Topic1-197, Topic1-198, Topic1-199]
2018-12-12 23:51:54.283  INFO 2023 --- [kafkaConsumer-5] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [Topic1-124, Topic1-125, Topic1-126, Topic1-127, Topic1-128, Topic1-129, Topic1-130, Topic1-131, Topic1-120, Topic1-121, Topic1-122, Topic1-123, Topic1-132, Topic1-133, Topic1-134, Topic1-135, Topic1-136, Topic1-137, Topic1-138, Topic1-139]
2018-12-12 23:51:54.283  INFO 2023 --- [kafkaConsumer-7] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [Topic1-160, Topic1-161, Topic1-162, Topic1-163, Topic1-172, Topic1-173, Topic1-174, Topic1-175, Topic1-176, Topic1-177, Topic1-178, Topic1-179, Topic1-164, Topic1-165, Topic1-166, Topic1-167, Topic1-168, Topic1-169, Topic1-170, Topic1-171]
2018-12-12 23:51:54.283  INFO 2023 --- [afkaConsumer-10] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [Topic1-28, Topic1-29, Topic1-30, Topic1-31, Topic1-32, Topic1-33, Topic1-34, Topic1-35, Topic1-20, Topic1-21, Topic1-22, Topic1-23, Topic1-24, Topic1-25, Topic1-26, Topic1-27, Topic1-36, Topic1-37, Topic1-38, Topic1-39]
2018-12-12 23:51:54.283  INFO 2023 --- [kafkaConsumer-3] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [Topic1-92, Topic1-93, Topic1-94, Topic1-95, Topic1-96, Topic1-97, Topic1-98, Topic1-99, Topic1-84, Topic1-85, Topic1-86, Topic1-87, Topic1-88, Topic1-89, Topic1-90, Topic1-91, Topic1-80, Topic1-81, Topic1-82, Topic1-83]

Я думаю, 3 непрочитанных сообщения читаются до того, как все потребители готовы, и каждое из них читается потребителем с именем kafkaConsumer-1 .Эта ситуация не изменилась, когда я выдвинул гораздо больше сообщений, когда приложение-пользователь закрыто.

Как я могу одновременно читать все непрочитанные сообщения при запуске приложения?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...