Как создать поток весенней интеграции, считываемый с kafka, с процессом ограничения очереди (параллельно) - PullRequest
1 голос
/ 10 апреля 2020

Я хотел бы создать поток, который асинхронно читает сообщение kafka и использует канал очереди для накопления количества сообщений для обработки, и только в конце обработки этих сообщений (например, 50 сообщений) он может обработать еще 50 сообщений. или как это освобождает место в этой очереди. Я попытался использовать поток, который читает из делегатов kafka в другой поток с помощью QueueChannel с PollerMetadata (Pollers.fixedDelay (500) .maxMessagesPerPoll (50)), но поллер использует один поток для чтения сообщений, я не могу выполнить параллельную обработку из 50 сообщений, если я помещу исполнителя в опросчик, он будет работать как обычный исполнитель, и он будет накапливать сообщения, и он не будет зависать в 50, пока у меня не будет нового потока, доступного ему для получения другого сообщения от kafka.

Цель состоит в том, чтобы распараллелить обработку до 50 сообщений кафки, но он снова читает только в кафке (consumer.pool), когда эта очередь освобождается, но он бесконечно читает из кафки и обрабатывает в пределах предельного количества исполнителя или опрошенного, как я могу достичь этой цели, используя поток интеграции пружины с kafka?

Только этой конфигурации достаточно для каждого потребителя topi c? в журнале всегда печатается один и тот же поток: [ntainer # 0-1- C -1] даже я задаю 10 для параллелизма

Blockquote

> Kafka.messageDrivenChannelAdapter(consumerFactory,
> topic).configureListenerContainer { kafkaMessageListenerContainer ->  
> kafkaMessageListenerContainer.concurrency(concurrency)               
> kafkaMessageListenerContainer.ackMode(ContainerProperties.AckMode.RECORD)
> }
>                 .errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)

1 Ответ

1 голос
/ 10 апреля 2020

Вы никогда не должны использовать канал очереди или выполнять асинхронную обработку c с Kafka. Слишком сложно отслеживать смещения внутри темы / разделов. Вы рискуете потерять сообщения.

Вместо этого, чтобы увеличить параллелизм, увеличьте количество разделов в topi c и установите параллелизм контейнера слушателя, чтобы получить необходимое количество потребителей (например, 50).

Обычно у вас должно быть больше разделов, чем у потребителей, но вам нужно как минимум столько же, потому что только один потребитель в группе может потреблять из раздела.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...