По умолчанию задание структурированной потоковой передачи использует HDFSStateStoreProvide
.Проблема с использованием хранилища HDFS в том, что он не масштабируется.Когда задание получает больше данных от kafka в часы интенсивного трафика, оно не выполняется из-за следующей ошибки:
18/12/06 15:54:35 ERROR scheduler.TaskSetManager: Task 191 in stage 231.0 failed 4 times; aborting job
18/12/06 15:54:35 ERROR streaming.StreamExecution: Query eventQuery [id = 42051afe-b1bc-438d-8143-2d7e5def717c, runId = 6201c769-b115-4b92-bad5-450b8803b88b] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 191 in stage 231.0 failed 4 times, most recent failure: Lost task 191.3 in stage 231.0 (TID 24016, sparkstreamingc1n5.host.bo1.csnzoo.com, executor 659): java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:481)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
at scala.Option.getOrElse(Option.scala:121)
Как настроить пользовательское хранилище состояний?
В целях тестирования я попытался добавить поддельный класс
--conf spark.sql.streaming.stateStore.providerClass=com.streaming.state.RocksDBStateStoreProvider
Но задание по-прежнему выбирает HDFSStateStoreProvider, даже если этот класс не существует.Это ожидаемое поведение?
Могу ли я использовать любую базу данных значений ключей для записи пользовательского поставщика состояний?
Или оно ограничено только RocksDB
и Cassandra
.