Меня интересует приложение Spark Streaming (Spark v2.3.2), которое получает данные паркета S3 и записывает данные паркета в S3. Поток фрейма данных приложения использует groupByKey()
и flatMapGroupsWithState()
, чтобы использовать GroupState
.
Можно ли настроить это для использования местоположения контрольной точки s3 ? Например:
val stream = myDataset.writeStream
.format("parquet")
.option("path", s3DataDestination)
.option("checkpointLocation", s3CheckpointPath)
.option("truncate", false)
.option(Trigger.Once)
.outputMode(OutputMode.Append)
stream.start().awaitTermination()
Я подтвердил, что вышеуказанное может успешно записывать данные в s3DataDestination
.
Однако при записи в местоположение контрольной точки s3 возникает исключение:
java.lang.IllegalStateException: Error committing version 1 into HDFSStateStore[id=(op=0, part=9), dir=s3://<my_s3_location>
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(...)
...
Caused by: java.io.IOException: Failed to rename s3://.../checkpoint/state/0/9/temp... to s3://.../checkpoint/state/0/9/1.delta
Потребуется ли для этого индивидуальная реализация S3 StateStoreProvider
? Или нужно записать местоположение контрольной точки в HDFS?