Kafka искра интеграции - PullRequest
0 голосов
/ 15 мая 2019

Я создаю одно приложение, в котором я получу потоковые данные (csv) из kafka и запишу их в hdfs. Данные будут разделены запятыми, но количество полей не фиксировано.Поскольку структура данных файла не фиксирована, я не могу применить структуру схемы при потоковой передаче.

Data format:
A
B,C,D
A,B,C,D 

Моя цель - записать эти данные в виде csv в hdfs с использованием потоковой искры.

До сих пор я пробовал использовать приведенный ниже фрагмент кода. Я могу записать в формате json:

for msg in consumer:
  data = msg.value
  if data:
    data_json =json.loads(json.dumps(data,ensure_ascii=False,indent=1),encoding='utf-8')
    data_df=sqlContext.read.json(sc.parallelize([data_json]))
    data_df.write.mode('append').json(hdfs_path)
    consumer.commit() 

Я использую spark 1.6 и Python 2.7.Любые выводы будут полезны.

...