У меня есть небольшой искровой кластер с одним мастером и двумя рабочими. У меня есть потоковое приложение Kafka, которое передает данные из Kafka и записывает их в каталог в формате паркета и в режиме добавления.
Пока я могу читать из потока Кафки и записывать его в файл паркета, используя следующую ключевую строку.
val streamingQuery = mydf.writeStream.format("parquet").option("path", "/root/Desktop/sampleDir/myParquet").outputMode(OutputMode.Append).option("checkpointLocation", "/root/Desktop/sampleDir/myCheckPoint").start()
Я проверил обоих работников. Было создано 3-4 файла snappy parquet с именами файлов с префиксом part-00006-XXX.snappy.parquet
.
Но когда я пытаюсь прочитать этот файл паркета, используя следующую команду:
val dfP = sqlContext.read.parquet("/root/Desktop/sampleDir/myParquet")
показывает исключение файлов, не найденных для некоторых файлов разделения паркета. Странно то, что эти файлы уже присутствуют в одном из рабочих узлов.
При дальнейшей проверке в журналах замечено, что spark пытается получить все файлы паркета только с ОДНОГО рабочего узла, и, поскольку не все файлы паркета присутствуют на одном рабочем месте, выполняется сбой, за исключением того, что эти файлы не были найдены в упомянутом пути к паркету.
Я пропустил какой-то важный шаг в потоковом запросе или при чтении данных?
ПРИМЕЧАНИЕ. У меня нет инфраструктуры HADOOP. Я хочу использовать только файловую систему.