Я использую Spark Structured Streaming для чтения из топики Кафки c (скажем topic1) и использую SINK для записи в другие topi c (topic1-result). Я вижу, что сообщения не удаляются из Topic1 после записи в другую topi c с использованием Sink.
// Subscribe to 1 topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1")
.option("subscribe", "topic1")
.load()
//SINK to another topic
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1")
.option("checkpointLocation", "/tmp/checkpoint1")
.option("topic", "topic1-result")
.start()
В документации сказано, что мы не можем использовать автоматическую фиксацию для структурированных потоков enable.auto.commit: Kafka source doesn’t commit any offset.
но как подтвердить сообщения и удалить обработанные сообщения из топи c (topic1)