Недавно я начал учиться работать с Кафкой . В проекте, над которым я работаю, используется sarama .
. Для чтения сообщений я использую ConsumerGroup
.
. Мне нужно через некоторое время снова прочитать сообщение, если foo
возвращает false
. Как это можно сделать?
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
if ok := foo(message); ok {
session.MarkMessage(message, "")
} else {
// ???
}
}
return nil
}