Kafka commitAsync повторяет с фиксацией заказа - PullRequest
0 голосов
/ 10 ноября 2018

Я читаю Кафка, полное руководство , и в главе «Потребители» есть реклама «Повторные попытки асинхронного выполнения»:

Простой способ получить правильный порядок фиксации для асинхронных повторных попыток состоит в использовании монотонно увеличивающегося порядкового номера. Увеличивайте порядковый номер при каждой фиксации и добавляйте порядковый номер во время фиксации в обратный вызов commitAsync . Когда вы готовитесь отправить повторную попытку, проверьте, равен ли порядковый номер фиксации, полученный обратным вызовом, переменной экземпляра ; если это так, не было более новой фиксации, и безопасно повторить попытку. Если порядковый номер экземпляра выше, не пытайтесь повторить, потому что новый коммит уже отправлен.

Быстрый пример автора был бы великолепен здесь для таких плотных людей, как я. Мне особенно непонятно, о чем я говорил выше.

Может ли кто-нибудь пролить свет на то, что это значит, или даже лучше привести пример с игрушкой, демонстрирующий это?

1 Ответ

0 голосов
/ 11 ноября 2018

Вот то, что я думаю, но смирился, я могу ошибаться

      try {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(5);
            for (ConsumerRecord<String, String> record : records) {
                System.out.format("offset: %d\n", record.offset());
                System.out.format("partition: %d\n", record.partition());
                System.out.format("timestamp: %d\n", record.timestamp());
                System.out.format("timeStampType: %s\n", record.timestampType());
                System.out.format("topic: %s\n", record.topic());
                System.out.format("key: %s\n", record.key());
                System.out.format("value: %s\n", record.value());
            }

            consumer.commitAsync(new OffsetCommitCallback() {
                private int marker = atomicInteger.incrementAndGet();
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
                                       Exception exception) {
                    if (exception != null) {
                        if (marker == atomicInteger.get()) consumer.commitAsync(this);
                    } else {
                        //Cant' try anymore
                    }
                }
            });
        }
    } catch (WakeupException e) {
        // ignore for shutdown
    } finally {
        consumer.commitSync(); //Block
        consumer.close();
        System.out.println("Closed consumer and we are done");
    }
...