структурированная потоковая передача с генерацией файлов .c000.csv - PullRequest
0 голосов
/ 15 января 2019

Я пытаюсь получить данные из темы kafka и помещаю их в папку hdfs. Я столкнулся со следующей проблемой.

После каждого сообщения (kafka) местоположение hdfs обновляется файлами деталей в формате .c000.csv. Я создал одну таблицу кустов поверх расположения HDFS, но HIVE не может читать данные, записанные с помощью spark структурированная потоковая передача.

ниже - формат файла после структурированной потоковой передачи

  part-00001-abdda104-0ae2-4e8a-b2bd-3cb474081c87.c000.csv

Вот мой код для вставки:

val kafkaDatademostr = spark.readStream.format("kafka").option("kafka.bootstrap.servers","ttt.tt.tt.tt.com:8092").option("subscribe","demostream").option("kafka.security.protocol","SASL_PLAINTEXT").load

val interval=kafkaDatademostr.select(col("value").cast("string")) .alias("csv").select("csv.*")

val interval2=interval.selectExpr("split(value,',')[0] as rog" ,"split(value,',')[1] as vol","split(value,',')[2] as agh","split(value,',')[3] as aght","split(value,',')[4] as asd")

//   interval2.writeStream.outputMode("append").format("console").start()
       interval2.writeStream.outputMode("append").partitionBy("rog").format("csv").trigger(Trigger.ProcessingTime("30 seconds")).option("path", "hdfs://vvv/apps/hive/warehouse/area.db/test_kafcsv/").start()

Может кто-нибудь помочь мне, почему он создает такие файлы?

Если я сделаю dfs -cat /part-00001-ad35a3b6-8485-47c8-b9d2-bab2f723d840.c000.csv, я смогу увидеть свои значения .... но он не читает с улья из-за проблемы с форматом ...

1 Ответ

0 голосов
/ 15 января 2019

Эти c000-файлы являются временными файлами, в которые потоковые данные записывают свои данные. Поскольку вы находитесь в режиме добавления, spark executor удерживает этот поток записи, поэтому во время выполнения вы не можете прочитать его с помощью сериализатора hive, хотя hadoop fs -cat работает.

...