Структурная потоковая передача данных: недопустимое исключение состояния - PullRequest
0 голосов
/ 20 ноября 2018

Я пробую структурированную потоковую передачу в Azure Databricks, используя хранилище Databricks dbfs: / в качестве контрольной точки и места хранения файлов.Я передаю данные о событиях из концентратора событий Azure в паркетные файлы в dbfs.

Когда я отменяю запрос и перезапускаю, я получаю исключение страшного недопустимого состояния.

Причина:java.lang.IllegalStateException: dbfs: / test02 / _spark_metadata / 2 не существует при сжатии пакета 9 (compactInterval: 10)

Я пытаюсь выяснить взаимосвязь между концентраторами событий AzureПараметр начального положения, смещение контрольной точки и журналы фиксации, а также информация о месте хранения _spark_metadata.

Почему я получаю недопустимое состояние при перезапуске и как различные данные журналов и метаданных используются Spark?

Существует 10 файлов журнала смещения с именами от 0 до 9. 9 файлов журнала фиксации с именами от 0 до 8. Очевидно, что должен быть другой файл журнала фиксации, если последний пакет был обработан полностью.

Файлы журналов коммитов, по-видимому, не содержат соответствующей информации, почему я предполагаю, что это их чистое существование (имя)mportant.

commit log file:
v1
{"nextBatchWatermarkMs":0}

Журналы смещений содержат только информацию о смещениях на раздел.

{
    "batchWatermarkMs": 0,
    "batchTimestampMs": 1542634173589,
    "conf": {
        "spark.sql.shuffle.partitions": "200",
        "spark.sql.streaming.stateStore.providerClass": "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider",
        "spark.sql.streaming.multipleWatermarkPolicy": "min",
        "spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion": "2"
    }
}
{
    "poc-test01": {
        "1": 3666,
        "0": 3664
    }
}

Если я правильно понял, у структурированной потоковой передачи есть цикл, в котором она сначала записывает в журнал запись впереди.смещение / папка.Затем обработайте данные партии.Затем запишите журнал коммитов в коммит / папку.Каждый пакет запускает новый цикл.

Папка _spark_metadata / содержит только два файла журнала, 0 и 1, каждый файл содержит информацию о двух файлах паркета в каталоге filepath /.

{
    "path": "dbfs:/test02/part-00000-98c5def2-bdf3-4e56-af9f-ff0796344b01-c000.snappy.parquet",
    "size": 59180,
    "isDir": false,
    "modificationTime": 1542214122000,
    "blockReplication": 1,
    "blockSize": 536870912,
    "action": "add"
}
{
    "path": "dbfs:/test02/part-00001-a38dbf8b-240a-48d0-8438-f81f0506a79c-c000.snappy.parquet",
    "size": 59462,
    "isDir": false,
    "modificationTime": 1542214122000,
    "blockReplication": 1,
    "blockSize": 536870912,
    "action": "add"
}

В папке filepath / папка находится 10 файлов паркетных файлов, а концентратор событий содержит два раздела.

Теперь я лучше всего предположил, что папка _spark_metadata / используется для идемпотентности в приемнике файлов.Spark должен иметь возможность отслеживать обработанные партии от исходных позиций смещения до созданных файлов в папке filepath /, в которой находится папка _spark_metadata /.Каким-то образом это сломалось.

  • Задокументировано ли где-нибудь сочетание контрольных точек и обработки метаданных хранилища файлов?Кажется, это огромный черный ящик, который так работает.
  • Это ошибка структурированной потоковой передачи или ошибка контрольной точки приемника файлов?
  • Как я могу вернуться к последнему рабочему состоянию?
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...