Я пробую структурированную потоковую передачу в Azure Databricks, используя хранилище Databricks dbfs: / в качестве контрольной точки и места хранения файлов.Я передаю данные о событиях из концентратора событий Azure в паркетные файлы в dbfs.
Когда я отменяю запрос и перезапускаю, я получаю исключение страшного недопустимого состояния.
Причина:java.lang.IllegalStateException: dbfs: / test02 / _spark_metadata / 2 не существует при сжатии пакета 9 (compactInterval: 10)
Я пытаюсь выяснить взаимосвязь между концентраторами событий AzureПараметр начального положения, смещение контрольной точки и журналы фиксации, а также информация о месте хранения _spark_metadata.
Почему я получаю недопустимое состояние при перезапуске и как различные данные журналов и метаданных используются Spark?
Существует 10 файлов журнала смещения с именами от 0 до 9. 9 файлов журнала фиксации с именами от 0 до 8. Очевидно, что должен быть другой файл журнала фиксации, если последний пакет был обработан полностью.
Файлы журналов коммитов, по-видимому, не содержат соответствующей информации, почему я предполагаю, что это их чистое существование (имя)mportant.
commit log file:
v1
{"nextBatchWatermarkMs":0}
Журналы смещений содержат только информацию о смещениях на раздел.
{
"batchWatermarkMs": 0,
"batchTimestampMs": 1542634173589,
"conf": {
"spark.sql.shuffle.partitions": "200",
"spark.sql.streaming.stateStore.providerClass": "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider",
"spark.sql.streaming.multipleWatermarkPolicy": "min",
"spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion": "2"
}
}
{
"poc-test01": {
"1": 3666,
"0": 3664
}
}
Если я правильно понял, у структурированной потоковой передачи есть цикл, в котором она сначала записывает в журнал запись впереди.смещение / папка.Затем обработайте данные партии.Затем запишите журнал коммитов в коммит / папку.Каждый пакет запускает новый цикл.
Папка _spark_metadata / содержит только два файла журнала, 0 и 1, каждый файл содержит информацию о двух файлах паркета в каталоге filepath /.
{
"path": "dbfs:/test02/part-00000-98c5def2-bdf3-4e56-af9f-ff0796344b01-c000.snappy.parquet",
"size": 59180,
"isDir": false,
"modificationTime": 1542214122000,
"blockReplication": 1,
"blockSize": 536870912,
"action": "add"
}
{
"path": "dbfs:/test02/part-00001-a38dbf8b-240a-48d0-8438-f81f0506a79c-c000.snappy.parquet",
"size": 59462,
"isDir": false,
"modificationTime": 1542214122000,
"blockReplication": 1,
"blockSize": 536870912,
"action": "add"
}
В папке filepath / папка находится 10 файлов паркетных файлов, а концентратор событий содержит два раздела.
Теперь я лучше всего предположил, что папка _spark_metadata / используется для идемпотентности в приемнике файлов.Spark должен иметь возможность отслеживать обработанные партии от исходных позиций смещения до созданных файлов в папке filepath /, в которой находится папка _spark_metadata /.Каким-то образом это сломалось.
- Задокументировано ли где-нибудь сочетание контрольных точек и обработки метаданных хранилища файлов?Кажется, это огромный черный ящик, который так работает.
- Это ошибка структурированной потоковой передачи или ошибка контрольной точки приемника файлов?
- Как я могу вернуться к последнему рабочему состоянию?