У меня есть задание Spark Structured Streaming, которое настроено на чтение данных из Kafka. Пожалуйста, пройдите код, чтобы проверить readStream()
с параметрами, чтобы прочитать последние данные из Kafka.
Я понимаю, что readStream()
читает с первого смещения при запуске нового запроса, а не при возобновлении.
Но я не знаю, как начинать новый запрос каждый раз, когда перезапускаю свою работу в IntelliJ.
val kafkaStreamingDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", AppProperties.getProp(AppConstants.PROPS_SERVICES_KAFKA_SERVERS))
.option("subscribe", AppProperties.getProp(AppConstants.PROPS_SDV_KAFKA_TOPICS))
.option("failOnDataLoss", "false")
.option("startingOffsets","earliest")
.load()
.selectExpr("CAST(value as STRING)", "CAST(topic as STRING)")
Я также пытался установить смещения с помощью """{"topicA":{"0":0,"1":0}}"""
Следом идет мой писатель
val query = kafkaStreamingDF
.writeStream
.format("console")
.start()
Каждый раз, когда я перезапускаю свою работу в IntelliJ IDE, журналы показывают, что смещение было установлено как самое позднее, а не 0 или самое раннее.
Есть ли способ очистки моей контрольной точки, в этом случае я не знаю, где находится каталог контрольных точек, потому что в приведенном выше случае я не указываю никаких контрольных точек.