Я хотел бы создать поток, который асинхронно читает сообщение kafka и использует канал очереди для накопления количества сообщений для обработки, и только в конце обработки этих сообщений (например, 50 сообщений) он может обработать еще 50 сообщений. или как это освобождает место в этой очереди. Я попытался использовать поток, который читает из делегатов kafka в другой поток с помощью QueueChannel с PollerMetadata (Pollers.fixedDelay (500) .maxMessagesPerPoll (50)), но поллер использует один поток для чтения сообщений, я не могу выполнить параллельную обработку из 50 сообщений, если я помещу исполнителя в опросчик, он будет работать как обычный исполнитель, и он будет накапливать сообщения, и он не будет зависать в 50, пока у меня не будет нового потока, доступного ему для получения другого сообщения от kafka.
Цель состоит в том, чтобы распараллелить обработку до 50 сообщений кафки, но он снова читает только в кафке (consumer.pool), когда эта очередь освобождается, но он бесконечно читает из кафки и обрабатывает в пределах предельного количества исполнителя или опрошенного, как я могу достичь этой цели, используя поток интеграции пружины с kafka?
Только этой конфигурации достаточно для каждого потребителя topi c? в журнале всегда печатается один и тот же поток: [ntainer # 0-1- C -1] даже я задаю 10 для параллелизма
Blockquote
> Kafka.messageDrivenChannelAdapter(consumerFactory,
> topic).configureListenerContainer { kafkaMessageListenerContainer ->
> kafkaMessageListenerContainer.concurrency(concurrency)
> kafkaMessageListenerContainer.ackMode(ContainerProperties.AckMode.RECORD)
> }
> .errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)