Управление смещениями с помощью Spark структурированного пакетного задания с Kafka - PullRequest
0 голосов
/ 05 февраля 2019

У меня есть сценарий использования, когда я пишу пакетное задание

Мне нужно прочитать тему Кафки и записать данные в HDFS.Мой код выглядит следующим образом:

val df: DataFrame = spark.read
  .format("kafka")
  .option("subscribe", "test-topic")
  .option("includeTimestamp", true)
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("group.id", "test-cg")
  .option("checkpointLocation", "/group/test/checkpointsDir")
  .load

df.write.
  parquet(buildPathWithCurrentBatchTime())

Каждый раз, когда задание читает тему Кафки, оно начинается с самого раннего смещения и, следовательно, одно и то же сообщение регистрируется в нескольких пакетах.Как заставить мое задание читать сообщения, начиная со смещений после смещения, прочитанного предыдущим экземпляром задания.

Я попытался установить местоположение контрольной точки, идентификатор группы, но не помогло.

Iне хочу использовать потоковый запрос.У меня есть простой случай использования данных из темы Кафки.У меня нет требований к задержке.Единственным требованием является отсутствие дубликатов в журналах.Это низкий приоритет.Если я использую потоковый запрос, он будет использовать исполнителей все время, что является пустой тратой ресурсов.Поэтому я хочу сделать это в партии

1 Ответ

0 голосов
/ 21 февраля 2019

То, что вы используете - это пакетный запрос вместо потокового запроса.(Может быть, не хватает места?) Простая замена read на readStream и write на writeStream будет работать для вас.

РЕДАКТИРОВАТЬ: Как OP пояснил, что можно использовать одноразовый триггер, я только что обновилкод для использования структурированной потоковой передачи с одним временным триггером.(ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: я не скомпилировал / не запустил код, но изменение подходит для Руководства по структурированной потоковой передаче.)

val df: DataFrame = spark.readStream
  .format("kafka")
  .option("subscribe", "test-topic")
  .option("includeTimestamp", true)
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("group.id", "test-cg")
  .option("checkpointLocation", "/group/test/checkpointsDir")
  .load

val query = df.writeStream
  .format("parquet")
  .option("path", buildPathWithCurrentBatchTime())
  .trigger(Trigger.Once())
  .start()

query.awaitTermination()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...