Потребитель Кафки: Как программно потреблять от заданного c смещения в Go Сарама - PullRequest
1 голос
/ 04 мая 2020

Недавно я начал учиться работать с Кафкой . В проекте, над которым я работаю, используется sarama .

. Для чтения сообщений я использую ConsumerGroup.

. Мне нужно через некоторое время снова прочитать сообщение, если foo возвращает false. Как это можно сделать?

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

    for message := range claim.Messages() {

            if ok := foo(message); ok {
                session.MarkMessage(message, "")
            } else {
                // ???
            }

    }

    return nil
}

1 Ответ

1 голос
/ 04 мая 2020

Вы можете сбросить смещение Consumer Group на более старое смещение, включив в обратный вызов Setup() вашей Consumer Group следующее:

func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {
    sess.ResetOffset(topic, partition, offset, "")

    return nil
}

Вы также можете добиться этого через консоль:

kafka-consumer-groups \
    --bootstrap-server localhost:9092 \
    --group my-consumer-group \
    --topic myTopicName \
    --reset-offsets \
    --to-offfset 100 \
    --execute
...