Кафка - переместить сообщения в начало очереди в соответствии с запланированным временем - PullRequest
0 голосов
/ 18 мая 2018

Есть ли в Кафке функция (или другие очереди сообщений), с помощью которой мы можем добавлять сообщения, которые станут активными только в указанное время.Это означает, что сообщение не должно использоваться до указанного времени.

Отслеживание, есть ли способ, с помощью которого мы можем переместить эти сообщения в начало очереди, когда оно достигает запланированного времени.

Ответы [ 2 ]

0 голосов
/ 21 мая 2018

Не в самом брокере Kafka, но вы можете сделать это, используя Kafka Streams , определив пользовательский Процессор , который выполняет следующие действия:

  • сохранить каждое сообщение в StateStore, используя метку времени, когда сообщение должно быть обработано как ключ.Поскольку у нескольких сообщений может быть одна и та же временная метка, вам нужно будет обернуть их в коллекцию или добавить к ключу какое-то уникальное значение.
  • в методе init планирует пунктуацию, которая периодическивыполняет ранжированный запрос в хранилище состояний и пересылает сообщения, которые достигли своей отметки времени, и удаляет их из хранилища.
  • перенаправленные сообщения будут помещаться в отдельныйтема и потребители должны будут использовать эту тему, но сообщения будут в указанном вами порядке.
0 голосов
/ 18 мая 2018

Кафка не может этого сделать.Kafka - это, по сути, просто журнал коммитов, который будет записывать сообщения в том порядке, в котором они были получены, и читать их в том же порядке.

Однако другие более традиционные брокеры обмена сообщениями могут это сделать - например, ActiveMQ Artemis поддерживает запланированные сообщения .Я не думаю, что сообщение будет перемещаться в очереди, но оно не будет использовано раньше запланированного времени.

...