Я реализую своего рода очередь задержек с kafka.Каждое сообщение, полученное контейнером слушателя Spring kafka для определенной темы (скажем, t1), должно быть отложено на определенное время (скажем, d минут), а затем сообщение msg должно быть отправлено обратно в другую тему (скажем, t2).В настоящее время я делаю это в весеннем методе контейнера слушателя kafka (AcknowledgingConsumerAwareMessageListener):
- получает сообщение от t1
- , приостанавливает контейнер слушателя
- сон на d минутпри необходимости
- возобновить контейнер слушателя
- отправить сообщение в t2
Я понимаю, что поток сердцебиения - это другой поток, и на него не влияют вышеуказанные шаги,но опрос происходит в том же потоке, что и обработка, и после обработки записи согласно этому ответу .Я установил для своих свойств @KafkaListener значение «max.poll.interval.ms = 2xd», чтобы оно не истекло, но я получаю NonResponsiveConsumerEvent (с timeSinceLastPoll) из KafkaEventListener.Даже если я не установил max.poll.interval.ms для свойств @KafkaListener, я все равно получаю тот же NonResponsiveConsumerEvent.В обоих случаях сообщение обрабатывается только один раз и отправляется на t2.
Вопросы
- Если опрос не происходит в max.poll.interval.ms, когда слушательКонтейнер приостановлен, каковы последствия этого?Как насчет того, когда контейнер не приостановлен?(Я настроил потребителя на подтверждение вручную)
- Должен ли я создать отдельный поток для сна и возобновления контейнера и, таким образом, освободить поток обработки контейнера для опроса?Имеет ли это значение?
версии: Spring Boot 2.1.8, Spring Kafka 2.2.8