Есть ли способ проанализировать JSON (без разбора схемы) при чтении из Кафа в искровой структурированной потоковой передачи - PullRequest
0 голосов
/ 01 октября 2019

Я пытаюсь отправить json-строку из kafka и сохранить ее в hdfs-файлах json, используя программирование потоковой структурой spark.

Например, мой ввод выглядит следующим образом:

{
"customerId":1
"Name":"xyz"
"dept":"IT"
}

мой выводдолжны быть сохранены все данные, полученные из потоковой передачи, в одном файле через некоторый интервал.

{
"customerId":1
"Name":"xyz"
"dept":"IT"
}{
"customerId":1
"Name":"xyz"
"dept":"IT"
}

Но я получаю вывод ниже:

"value":{
"customerId":1
"Name":"xyz"
"dept":"IT"
}
"value":{
"customerId":1
"Name":"xyz"
"dept":"IT"
}

Я хочу удалить опцию значения. Я знаю, что это исходит от Кафки. Если я попробовал с схемой, она работает нормально. Но моя схема должна быть динамичной для каждого клиента с умом. Любая помощь будет оценена.

Ниже приведен пример базы кода:

val inputDf = spark.read.format("kafka").option("kafka.bootstrap.servers","hd0-dn01.com:9091").option("subscribe","topic1").load()

val dataDf = inputDf.selectExpr("CAST(value AS STRING)")

dataDf.writeStream.trigger(Trigger.ProcessingTime(10,TimeUnit.SECONDS)).format("console").option("truncate",false).outputMode("append").start().awaitTermination()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...