Я не могу сказать, упускаю ли я что-то очевидное, или в spring-integration-kafka:3.0.1
есть ошибка, связанная с попыткой заставить нескольких пользователей работать по одной теме. Сценарий представляет собой одну тему Kafka с 10 разделами и одним приложением springboot, которое его слушает. Соответствующая конфигурация:
application.yml:
spring:
kafka:
consumer:
group-id: test-consumer
auto-offset-reset: earliest
listener:
concurrency: 4
Конфигурация:
@Configuration
@EnableIntegration
@IntegrationComponentScan("com.test")
public class MessageConfig {
@Bean
public MessageChannel testReceiveChannel() {
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow testReceiveFlow(@Qualifier("kafkaConsumerFactory") final ConsumerFactory<?, ?> kafkaConsumer, final MessageChannel testReceiveChannel) {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(kafkaConsumer, ListenerMode.record, "test-topic"))
.transform(new JsonToObjectTransformer(EventMessage.class))
.channel(testReceiveChannel)
.get();
}
}
Слушатель:
@Component
public class EventListener {
private static final Logger LOG = LoggerFactory.getLogger(EventListener.class);
@ServiceActivator(inputChannel = "testReceiveChannel")
public void processMessage(final EventMessage message) {
LOG.info("Got message {} on {}", message.getValue(), Thread.currentThread().getName());
}
}
При запуске я слышу только 1 контейнер на всех 10 разделах. Я вижу, что ConcurrentKafkaListenerContainerFactory
имеет правильное значение параллелизма, но кажется, что метод initializeContainer
никогда не вызывается (что применило бы его к фактическому потребителю, если я правильно понял). Я, вероятно, смотрю на совершенно неправильную вещь, однако.
Есть идеи о том, что я пропускаю?