Я использую структурированную потоковую передачу Spark для получения событий от Kafka и загрузки их на S3.
Контрольные точки фиксируются на S3:
DataFrameWriter<Row> writer = input.writeStream()
.format("orc")
.trigger(ProcessingTime(config.getProcessingTime()))
.outputMode(OutputMode.Append())
.option("truncate", false)
.option("checkpointLocation", "s3://bucket1")
.option("compression", "zlib")
.option("path", "s3://bucket2");
Смещения фиксируются для Кафки через StreamingQueryListener
:
kafkaConsumer.commitSync(topicPartitionMap);
Как только приложение запущено, оно получает карту смещения от Kafka и запускает поток:
reader = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
.option("subscribe", "topic1")
.option("max.poll.records", 1000)
.option("failOnDataLoss", false)
.option("startingOffsets", topicPartitionMap)
Я храню topic/partition/offset
с данными в файлах ORC.
Данные содержат несколько дубликатов событий с точным topic/partition/offset
.
Как должен быть настроен поток для достижения ровно однократной обработки?