Как правильно обработать интервал опроса для весенней кафки для реализации очереди с задержкой? - PullRequest
0 голосов
/ 20 сентября 2019

Я реализую своего рода очередь задержек с kafka.Каждое сообщение, полученное контейнером слушателя Spring kafka для определенной темы (скажем, t1), должно быть отложено на определенное время (скажем, d минут), а затем сообщение msg должно быть отправлено обратно в другую тему (скажем, t2).В настоящее время я делаю это в весеннем методе контейнера слушателя kafka (AcknowledgingConsumerAwareMessageListener):

  1. получает сообщение от t1
  2. , приостанавливает контейнер слушателя
  3. сон на d минутпри необходимости
  4. возобновить контейнер слушателя
  5. отправить сообщение в t2

Я понимаю, что поток сердцебиения - это другой поток, и на него не влияют вышеуказанные шаги,но опрос происходит в том же потоке, что и обработка, и после обработки записи согласно этому ответу .Я установил для своих свойств @KafkaListener значение «max.poll.interval.ms = 2xd», чтобы оно не истекло, но я получаю NonResponsiveConsumerEvent (с timeSinceLastPoll) из KafkaEventListener.Даже если я не установил max.poll.interval.ms для свойств @KafkaListener, я все равно получаю тот же NonResponsiveConsumerEvent.В обоих случаях сообщение обрабатывается только один раз и отправляется на t2.

Вопросы

  1. Если опрос не происходит в max.poll.interval.ms, когда слушательКонтейнер приостановлен, каковы последствия этого?Как насчет того, когда контейнер не приостановлен?(Я настроил потребителя на подтверждение вручную)
  2. Должен ли я создать отдельный поток для сна и возобновления контейнера и, таким образом, освободить поток обработки контейнера для опроса?Имеет ли это значение?

версии: Spring Boot 2.1.8, Spring Kafka 2.2.8

1 Ответ

0 голосов
/ 20 сентября 2019

сон в течение d минут, если требуется

Вы не можете "уснуть" поток пользователя более чем на max.poll.interval.ms.

Весь смысл приостановкиконтейнер таков, что он продолжает poll (но никогда не получит никаких новых записей до возобновления).

Если вы на самом деле спите слушателя, нет смысла останавливать;вам просто нужно увеличить th max.poll.interval.ms соответственно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...