Я создаю одно приложение, в котором я получу потоковые данные (csv) из kafka и запишу их в hdfs. Данные будут разделены запятыми, но количество полей не фиксировано.Поскольку структура данных файла не фиксирована, я не могу применить структуру схемы при потоковой передаче.
Data format:
A
B,C,D
A,B,C,D
Моя цель - записать эти данные в виде csv в hdfs с использованием потоковой искры.
До сих пор я пробовал использовать приведенный ниже фрагмент кода. Я могу записать в формате json:
for msg in consumer:
data = msg.value
if data:
data_json =json.loads(json.dumps(data,ensure_ascii=False,indent=1),encoding='utf-8')
data_df=sqlContext.read.json(sc.parallelize([data_json]))
data_df.write.mode('append').json(hdfs_path)
consumer.commit()
Я использую spark 1.6 и Python 2.7.Любые выводы будут полезны.