потребитель кафки получит сообщение, которое потреблялось раньше - PullRequest
0 голосов
/ 11 июля 2019

У меня есть потребительское задание в XD, которое будет выполнено, как только он получит сообщение, созданное другим заданием производителя.И я запускаю эти работы каждый день.Я обнаружил, что иногда этот потребитель получал сообщение, которое использовалось раньше.

Регистрируется как следующее:

####OK
2019-06-28 02:06:13+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1561658772877, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-06-28 02:06:13+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is  message_from_producer, task startTime is 1561658700108, timestamp is 1561658772877 ==================

####NG
2019-06-29 17:07:14+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1561399136840, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-06-29 17:07:14+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is  message_from_producer, task startTime is 1561799100282, timestamp is 1561399136840 ==================

####OK
2019-06-29 22:16:58+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 2, offset = 5, CreateTime = 1561817817702, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-06-29 22:16:58+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is  message_from_producer, task startTime is 1561817528447, timestamp is 1561817817702 ==================

####NG
2019-07-02 02:05:09+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1561399136840, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-07-02 02:05:09+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is  message_from_producer, task startTime is 1562004300372, timestamp is 1561399136840 ==================

Похоже, что он получил offset = 0 сообщение несколько раз.

Kakfa версия (1.0.0)

Потребитель фиксирует смещение вручную. (Consumer.commitSync ();)
Устанавливать только следующие свойства:

bootstrap.servers  
auto.offset.reset=earliest  
group.id  
client.id  
    Properties config = new Properties();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.put("auto.offset.reset", "earliest");
    config.put("group.id", group);
    config.put("client.id", config.getProperty("group.id") + "_" + System.currentTimeMillis());
    config.put("enable.auto.commit", false);
    try {
        consumer = new KafkaConsumer<>(config);
        consumer.subscribe(tList);
        while (true) {
            ConsumerRecords<?, ?> records = consumer.poll(10000);
            for (ConsumerRecord<?, ?> record : records) {
                //.........
                consumer.commitSync();
            }
            if (matched)
                break;
        }
    } finally {
        consumer.close();
    }

Ответы [ 2 ]

2 голосов
/ 11 июля 2019

В Kafka 1.1 по умолчанию смещения сохраняются только в течение 24 часов, поскольку для offsets.retention.minutes установлено значение 1440.

Таким образом, если вы остановите своего потребителя на срок более 24 часов, при перезапуске существует вероятность совершениясмещения будут удалены, что заставит потребителя использовать auto.offset.reset для поиска новой позиции.

Поскольку это было слишком коротким для многих людей, из Kafka 2.0 offsets.retention.minutes теперь установлено на 10080 (7 дней).

Вам следует изменить конфигурацию вашего брокера, чтобы разрешить сохранение смещения в течение более длительных периодов или обновить до более поздней версии Kafka.

0 голосов
/ 11 июля 2019

Попробуйте установить auto.offset.reset = latest, таким образом, после перезапуска потребитель начнет потреблять после последнего принятого смещения.

Дополнительная информация здесь https://kafka.apache.org/documentation/#consumerconfigs

...