Я пытаюсь отправить json-строку из kafka и сохранить ее в hdfs-файлах json, используя программирование потоковой структурой spark.
Например, мой ввод выглядит следующим образом:
{
"customerId":1
"Name":"xyz"
"dept":"IT"
}
мой выводдолжны быть сохранены все данные, полученные из потоковой передачи, в одном файле через некоторый интервал.
{
"customerId":1
"Name":"xyz"
"dept":"IT"
}{
"customerId":1
"Name":"xyz"
"dept":"IT"
}
Но я получаю вывод ниже:
"value":{
"customerId":1
"Name":"xyz"
"dept":"IT"
}
"value":{
"customerId":1
"Name":"xyz"
"dept":"IT"
}
Я хочу удалить опцию значения. Я знаю, что это исходит от Кафки. Если я попробовал с схемой, она работает нормально. Но моя схема должна быть динамичной для каждого клиента с умом. Любая помощь будет оценена.
Ниже приведен пример базы кода:
val inputDf = spark.read.format("kafka").option("kafka.bootstrap.servers","hd0-dn01.com:9091").option("subscribe","topic1").load()
val dataDf = inputDf.selectExpr("CAST(value AS STRING)")
dataDf.writeStream.trigger(Trigger.ProcessingTime(10,TimeUnit.SECONDS)).format("console").option("truncate",false).outputMode("append").start().awaitTermination()