Какое свойство kafka определяет частоту опроса для KafkaConsumer? - PullRequest
1 голос
/ 25 мая 2019

Я пытаюсь понять kafka в некоторых деталях относительно потоков kafka (клиент потока kafka для kafka).

Я понимаю, что KafkConsumer (клиент java) будет получать данные из kafka, однако я не могучтобы понять, с какой частотой клиент опрашивает какфа тему для получения данных?

1 Ответ

0 голосов
/ 26 мая 2019

Частота опроса определяется вашим кодом, потому что вы несете ответственность за вызов опроса.Очень наивный пример пользовательского кода, использующего KafkaConsumer, похож на после

public class KafkaConsumerExample {
  ...


    static void runConsumer() throws InterruptedException {
        final Consumer<Long, String> consumer = createConsumer();

        final int giveUp = 100;   int noRecordsCount = 0;

        while (true) {
            final ConsumerRecords<Long, String> consumerRecords =
                    consumer.poll(1000);

            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }

            consumerRecords.forEach(record -> {
                System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
                        record.key(), record.value(),
                        record.partition(), record.offset());
            });

            consumer.commitAsync();
        }
        consumer.close();
        System.out.println("DONE");
    }
}

В этом случае частота определяется продолжительностью обработки сообщений в consumerRecords.forEach.

Однако имейте в виду, что если вы не называете опрос «достаточно быстрым», ваш потребитель будет считаться мёртвым координатором брокера и произойдет перебалансировка.Это «достаточно быстро» определяется свойством max.poll.interval.ms в kafka> = 0.10.1.0.См. этот ответ для получения более подробной информации.

max.poll.interval.ms значение по умолчанию составляет пять минут, поэтому, если ваш consumerRecords.forEach займет больше времени, чем ваш потребитель будет считаться мертвым.

Если вы не хотите использовать необработанный KafkaConsumer напрямую, вы можете использовать alpakka kafka , библиотеку для получения и выдачи тем kafka safe и методом обратного давления (основывается на потоках akka).
В этой библиотеке частота опроса определяется конфигурацией akka.kafka.consumer.poll-interval.
Мы говорим, что это безопасно, так как он будет продолжать опрос, чтобы потребитель считался мертвым, даже если ваша обработка можетне отставать от ставки.Это возможно, потому что KafkaConsumer позволяет приостановить работу потребителя

 /**
     * Suspend fetching from the requested partitions. Future calls to {@link #poll(Duration)} will not return
     * any records from these partitions until they have been resumed using {@link #resume(Collection)}.
     * Note that this method does not affect partition subscription. In particular, it does not cause a group
     * rebalance when automatic assignment is used.
     * @param partitions The partitions which should be paused
     * @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
     */
    @Override
    public void pause(Collection<TopicPartition> partitions) { ... }

Чтобы полностью понять это, вам следует прочитать об akka-streams и backpressure.

...