Spring Kafka параллелизм с Spring-интеграцией - PullRequest
0 голосов
/ 08 мая 2018

Я не могу сказать, упускаю ли я что-то очевидное, или в spring-integration-kafka:3.0.1 есть ошибка, связанная с попыткой заставить нескольких пользователей работать по одной теме. Сценарий представляет собой одну тему Kafka с 10 разделами и одним приложением springboot, которое его слушает. Соответствующая конфигурация:

application.yml:

spring:
  kafka:
    consumer:
      group-id: test-consumer
      auto-offset-reset: earliest
    listener:
      concurrency: 4

Конфигурация:

@Configuration
@EnableIntegration
@IntegrationComponentScan("com.test")
public class MessageConfig {
    @Bean
    public MessageChannel testReceiveChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow testReceiveFlow(@Qualifier("kafkaConsumerFactory") final ConsumerFactory<?, ?> kafkaConsumer, final MessageChannel testReceiveChannel) {
        return IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(kafkaConsumer, ListenerMode.record, "test-topic"))
                .transform(new JsonToObjectTransformer(EventMessage.class))
                .channel(testReceiveChannel)
                .get();
    }
}

Слушатель:

@Component
public class EventListener {
    private static final Logger LOG = LoggerFactory.getLogger(EventListener.class);

    @ServiceActivator(inputChannel = "testReceiveChannel")
    public void processMessage(final EventMessage message) {
        LOG.info("Got message {} on {}", message.getValue(), Thread.currentThread().getName());
    }
}

При запуске я слышу только 1 контейнер на всех 10 разделах. Я вижу, что ConcurrentKafkaListenerContainerFactory имеет правильное значение параллелизма, но кажется, что метод initializeContainer никогда не вызывается (что применило бы его к фактическому потребителю, если я правильно понял). Я, вероятно, смотрю на совершенно неправильную вещь, однако.

Есть идеи о том, что я пропускаю?

1 Ответ

0 голосов
/ 08 мая 2018

Spring Boot KafkaProperties (например, spring.kafka.listener.concurrency = 4) и упомянутый ConcurrentKafkaListenerContainerFactory применяются к @KafkaListener компонентам. Spring Integration не имеет ничего общего. По крайней мере, автоматически.

Вам нужно сделать это вручную:

Kafka.messageDrivenChannelAdapter(kafkaConsumer, ListenerMode.record, "test-topic")
      .configureListenerContainer(c ->
                        c.concurrency(this.kafkaProperties.getListener().getConcurrency()))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...