Я пытаюсь получить данные из темы kafka и помещаю их в папку hdfs. Я столкнулся со следующей проблемой.
После каждого сообщения (kafka) местоположение hdfs обновляется файлами деталей в формате .c000.csv. Я создал одну таблицу кустов поверх расположения HDFS, но HIVE не может читать данные, записанные с помощью spark структурированная потоковая передача.
ниже - формат файла после структурированной потоковой передачи
part-00001-abdda104-0ae2-4e8a-b2bd-3cb474081c87.c000.csv
Вот мой код для вставки:
val kafkaDatademostr = spark.readStream.format("kafka").option("kafka.bootstrap.servers","ttt.tt.tt.tt.com:8092").option("subscribe","demostream").option("kafka.security.protocol","SASL_PLAINTEXT").load
val interval=kafkaDatademostr.select(col("value").cast("string")) .alias("csv").select("csv.*")
val interval2=interval.selectExpr("split(value,',')[0] as rog" ,"split(value,',')[1] as vol","split(value,',')[2] as agh","split(value,',')[3] as aght","split(value,',')[4] as asd")
// interval2.writeStream.outputMode("append").format("console").start()
interval2.writeStream.outputMode("append").partitionBy("rog").format("csv").trigger(Trigger.ProcessingTime("30 seconds")).option("path", "hdfs://vvv/apps/hive/warehouse/area.db/test_kafcsv/").start()
Может кто-нибудь помочь мне, почему он создает такие файлы?
Если я сделаю dfs -cat /part-00001-ad35a3b6-8485-47c8-b9d2-bab2f723d840.c000.csv
, я смогу увидеть свои значения .... но он не читает с улья из-за проблемы с форматом ...