Один запрос на интеграцию искровой структурированной потоковой передачи с таблицей HIVE.
Я попытался привести несколько примеров потоковой структурированной искровой обработки.
вот мой пример
val spark =SparkSession.builder().appName("StatsAnalyzer")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
.getOrCreate()
// Register the dataframe as a Hive table
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta")
csvDF.createOrReplaceTempView("updates")
val query= spark.sql("insert into table_abcd select * from updates")
query.writeStream.start()
Как вы можете видеть на последнем шаге при записи фрейма данных в расположение hdfs, данные не вставляются в захватывающий каталог (мой существующийкаталог с некоторыми старыми данными, разделенными на «age»).
Я получаю
spark.sql.AnalysisException: запросы с потоковым источником должны выполняться с помощью writeStream start ()
Можете ли вы помочь, почему я не могу вставить данные в существующий каталог в папке hdfs?или есть какой-то другой способ, которым я могу сделать операцию "вставить в" на таблице улья?
В поисках решения