Как сохранить местоположение контрольной точки потоковой передачи Spark в S3? - PullRequest
0 голосов
/ 18 июня 2020

Меня интересует приложение Spark Streaming (Spark v2.3.2), которое получает данные паркета S3 и записывает данные паркета в S3. Поток фрейма данных приложения использует groupByKey() и flatMapGroupsWithState(), чтобы использовать GroupState.

Можно ли настроить это для использования местоположения контрольной точки s3 ? Например:

val stream = myDataset.writeStream
    .format("parquet")
    .option("path", s3DataDestination)
    .option("checkpointLocation", s3CheckpointPath)
    .option("truncate", false)
    .option(Trigger.Once)
    .outputMode(OutputMode.Append)
stream.start().awaitTermination()

Я подтвердил, что вышеуказанное может успешно записывать данные в s3DataDestination.

Однако при записи в местоположение контрольной точки s3 возникает исключение:

java.lang.IllegalStateException: Error committing version 1 into HDFSStateStore[id=(op=0, part=9), dir=s3://<my_s3_location>
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(...)
...
Caused by: java.io.IOException: Failed to rename s3://.../checkpoint/state/0/9/temp... to s3://.../checkpoint/state/0/9/1.delta

Потребуется ли для этого индивидуальная реализация S3 StateStoreProvider? Или нужно записать местоположение контрольной точки в HDFS?

1 Ответ

1 голос
/ 18 июня 2020

Проблема в том, что частота параллелизма записи и чтения слишком высока. AWS S3 не предоставляет такой возможности.

Решение:

  • Нам пришлось переключиться на локальный постоянный диск для проверки Spark, указывая
  • S3Guard : Это сделает чтение и запись S3 более последовательным (Примечание: это экспериментальный вариант, и я лично никогда не видел его в действии)
  • Использовать HDFS
...