У меня есть сценарий использования, когда я пишу пакетное задание
Мне нужно прочитать тему Кафки и записать данные в HDFS.Мой код выглядит следующим образом:
val df: DataFrame = spark.read
.format("kafka")
.option("subscribe", "test-topic")
.option("includeTimestamp", true)
.option("kafka.bootstrap.servers", "localhost:9092")
.option("group.id", "test-cg")
.option("checkpointLocation", "/group/test/checkpointsDir")
.load
df.write.
parquet(buildPathWithCurrentBatchTime())
Каждый раз, когда задание читает тему Кафки, оно начинается с самого раннего смещения и, следовательно, одно и то же сообщение регистрируется в нескольких пакетах.Как заставить мое задание читать сообщения, начиная со смещений после смещения, прочитанного предыдущим экземпляром задания.
Я попытался установить местоположение контрольной точки, идентификатор группы, но не помогло.
Iне хочу использовать потоковый запрос.У меня есть простой случай использования данных из темы Кафки.У меня нет требований к задержке.Единственным требованием является отсутствие дубликатов в журналах.Это низкий приоритет.Если я использую потоковый запрос, он будет использовать исполнителей все время, что является пустой тратой ресурсов.Поэтому я хочу сделать это в партии