Как записать потоковый DataFrame в Kafka со всеми строками в виде массива JSON? - PullRequest
0 голосов
/ 08 марта 2019

Я ищу решения для записи потоковых данных искры в kafka.Я использую следующий метод для записи данных в kafka

df.selectExpr("to_json(struct(*)) AS value").writeStream.format("kafka")

Но моя проблема заключается в том, что при записи в kafka данные отображаются следующим образом

{"country":"US","plan":postpaid,"value":300}
{"country":"CAN","plan":0.0,"value":30}

мой ожидаемый результат равен

   [
    {"country":"US","plan":postpaid,"value":300}
    {"country":"CAN","plan":0.0,"value":30}
   ]

Я хочу заключить строки в массив.Как можно добиться того же в потоковом искре?может кто-то посоветует

Ответы [ 2 ]

1 голос
/ 13 марта 2019

Я предполагаю, что схема потокового DataFrame (df) выглядит следующим образом:

root
 |-- country: string (nullable = true)
 |-- plan: string (nullable = true)
 |-- value: string (nullable = true)

Я также предполагаю, что вы хотите записать ( произвести ) все строки в потоковом фрейме данных (df) в тему Кафки как одну запись, в которой строки находятся в форме массив JSON.

Если это так, вам следует groupBy строк и collect_list, чтобы сгруппировать все строки в одну, которую вы могли бы записать в Kafka.

// df is a batch DataFrame so I could show for demo purposes
scala> df.show
+-------+--------+-----+
|country|    plan|value|
+-------+--------+-----+
|     US|postpaid|  300|
|    CAN|     0.0|   30|
+-------+--------+-----+

val jsons = df.selectExpr("to_json(struct(*)) AS value")
scala> jsons.show(truncate = false)
+------------------------------------------------+
|value                                           |
+------------------------------------------------+
|{"country":"US","plan":"postpaid","value":"300"}|
|{"country":"CAN","plan":"0.0","value":"30"}     |
+------------------------------------------------+

val grouped = jsons.groupBy().agg(collect_list("value") as "value")
scala> grouped.show(truncate = false)
+-----------------------------------------------------------------------------------------------+
|value                                                                                          |
+-----------------------------------------------------------------------------------------------+
|[{"country":"US","plan":"postpaid","value":"300"}, {"country":"CAN","plan":"0.0","value":"30"}]|
+-----------------------------------------------------------------------------------------------+

Я бы сделал все вышеперечисленное в DataStreamWriter.foreachBatch , чтобы получить DataFrame для работы.

0 голосов
/ 11 марта 2019

Я действительно не уверен, достижимо ли это, но я все равно выложу свое предложение здесь; так что вы можете преобразовать свой Dataframe впоследствии:

 //Input  
 inputDF.show(false)
 +---+-------+
 |int|string |
 +---+-------+
 |1  |string1|
 |2  |string2|
 +---+-------+

 //convert that to json
 inputDF.toJSON.show(false)
 +----------------------------+
 |value                       |
 +----------------------------+
 |{"int":1,"string":"string1"}|
 |{"int":2,"string":"string2"}|
 +----------------------------+

 //then use collect and mkString
 println(inputDF.toJSON.collect().mkString("[", "," , "]"))
 [{"int":1,"string":"string1"},{"int":2,"string":"string2"}]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...