Spark Структурированная Потоковая пользовательская StateStoreProvide - PullRequest
0 голосов
/ 07 декабря 2018

По умолчанию задание структурированной потоковой передачи использует HDFSStateStoreProvide.Проблема с использованием хранилища HDFS в том, что он не масштабируется.Когда задание получает больше данных от kafka в часы интенсивного трафика, оно не выполняется из-за следующей ошибки:

18/12/06 15:54:35 ERROR scheduler.TaskSetManager: Task 191 in stage 231.0 failed 4 times; aborting job
18/12/06 15:54:35 ERROR streaming.StreamExecution: Query eventQuery [id = 42051afe-b1bc-438d-8143-2d7e5def717c, runId = 6201c769-b115-4b92-bad5-450b8803b88b] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 191 in stage 231.0 failed 4 times, most recent failure: Lost task 191.3 in stage 231.0 (TID 24016, sparkstreamingc1n5.host.bo1.csnzoo.com, executor 659): java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:481)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
    at scala.Option.getOrElse(Option.scala:121)

Как настроить пользовательское хранилище состояний?

В целях тестирования я попытался добавить поддельный класс

--conf spark.sql.streaming.stateStore.providerClass=com.streaming.state.RocksDBStateStoreProvider

Но задание по-прежнему выбирает HDFSStateStoreProvider, даже если этот класс не существует.Это ожидаемое поведение?

Могу ли я использовать любую базу данных значений ключей для записи пользовательского поставщика состояний?

Или оно ограничено только RocksDB и Cassandra.

1 Ответ

0 голосов
/ 08 декабря 2018

Как настроить пользовательское хранилище состояний?

Ваш подход к настройке настраиваемого поставщика хранилища состояний выглядит правильно, но вы не можете изменить поставщика хранилища состояний после того, как запустили запрос раньше.(Spark будет считывать конфигурацию из метаданных в контрольной точке.) Это ограничение имеет смысл, поскольку при смене поставщика хранилища состояний не гарантируется восстановление состояния.

Можно ли использовать любую базу данных значений ключей для записипользовательский поставщик состояний?

Никаких особых ограничений нет, как только ваш пользовательский поставщик состояний реализует спецификацию для поставщика хранилища состояний.Необходимо учитывать два основных момента: 1. Spark будет проверять изменения для каждого пакета. 2. Spark требует, чтобы поставщик хранилища состояний восстановил состояние в определенной версии.Ваш пользовательский провайдер состояний должен быть производительным - потому что он будет добавлять задержку для каждого пакета.

Вы также можете рассмотреть возможность (транзитивной) зависимости, добавляемой в приложение Spark из-за нестандартного провайдера состояний.

...