Читайте последние записи из Kafka, используя пакетную работу pyspark - PullRequest
0 голосов
/ 03 марта 2020

Я выполняю пакетное задание в pyspark, где spark будет считывать данные из kafka topi c каждые 5 минут.

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1") \
  .option("subscribePattern", "test") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()

Всякий раз, когда spark считывает данные из kafka, он считывает все данные, включая предыдущие партии. Я хочу прочитать данные для текущей партии или последних записей, которые ранее не читались. Пожалуйста, предложите! Спасибо.

1 Ответ

1 голос
/ 03 марта 2020

С https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html#creating -a-kafka-source-for-batch-query

Для пакетных запросов самое последнее (либо неявно, либо с использованием -1 в json) не допускается.

Использование самых ранних означает, что все данные снова получены.

Вам необходимо будет явно указать смещение при каждом запуске, например:

.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")

Это означает, что вам необходимо сохранить смещения, обработанные для каждого раздела. Я в ближайшем будущем изучаю это для проекта. Ниже приведены некоторые пункты, которые помогут вам:

https://medium.com/datakaresolutions/structured-streaming-kafka-integration-6ab1b6a56dd1 с указанием того, что вы наблюдаете:

Создание пакетного запроса Kafka

  • Spark также предоставляет функцию извлечения данных из Kafka в пакетном режиме. В пакетном режиме Spark будет принимать все сообщения одновременно. Для Kafka в пакетном режиме требуются два важных параметра: начальные смещения и конечные смещения, если не указано, искра будет учитывать конфигурацию по умолчанию, а именно:

    • начальные смещения - самые ранние
    • конечные смещения - последние

https://dzone.com/articles/kafka-gt-hdfss3-batch-ingestion-through-spark также ссылается на то, что вы должны сделать, со следующим:

И, наконец, сохраните эти Kafka topi c endOffsets в файловой системе - локальной или HDFS (или передайте их в ZooKeeper). Это будет использоваться для следующего запуска запуска смещения для топки Кафки c. Здесь мы проверяем, что следующий запуск задания будет считываться из смещения, где остановился предыдущий запуск.

Этот блог https://dataengi.com/2019/06/06/spark-structured-streaming/ Я думаю, что есть ответ для сохранение смещений.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...