Я бы хотел загрузить таблицу Hive (target_table
) в качестве DataFrame после записи нового пакета в HDFS (target_table_dir
) с использованием Spark Structured Streaming следующим образом:
df.writeStream
.trigger(processingTime='5 seconds')
.foreachBatch(lambda df, partition_id:
df.write
.option("path", target_table_dir)
.format("parquet")
.mode("append")
.saveAsTable(target_table))
.start()
Когда мы немедленно читаем те же данные из таблицы Hive, мы получаем «исключение раздела не найдено». Если мы читаем с некоторой задержкой, у нас правильные данные.
Похоже, что Spark все еще записывает данные в HDFS, хотя выполнение остановлено, а Hive Metastore обновляется, но данные все еще записываются в HDFS.
Как узнать, когда запись данных в таблицу Hive (в HDFS) завершена?
Примечание:
мы обнаружили, что если мы используем processAllAvailable () после записи, последующее чтение работает нормально. Но processAllAvailable () навсегда блокирует выполнение, если мы имеем дело с непрерывными потоками