Как использовать maxOffsetsPerTrigger в структурированной потоковой передаче pyspark? - PullRequest
0 голосов
/ 26 июня 2018

Я хочу ограничить скорость при извлечении данных из кафки.Мой код выглядит так:

df = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers",'...')\
        .option("subscribe",'A') \
        .option("startingOffsets",'''{"A":{"0":200,"1":200,"2":200}}''') \
        .option("endingOffsets",'''{"A":{"0":400,"1":400,"2":400}}''') \
        .option("maxOffsetsPerTrigger",20) \
        .load() \
        .cache()

Однако, когда я звоню df.count(), результат равен 600. Я ожидал, что это 20. Кто-нибудь знает, почему maxOffsetsPerTrigger не работает.

1 Ответ

0 голосов
/ 26 июня 2018

Вы приносите 200 записей на каждый раздел (0, 1, 2), всего 600 записей.

Как вы можете видеть здесь:

Используйте параметр maxOffsetsPerTrigger дляограничить число записей, извлекаемых за триггер.

Это означает, что для каждого процесса триггера или выборки Kafka получит 20 записей, но в общей сложности вы все равно получите все записи, заданные в конфигурации (200 на раздел).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...