Я предполагаю, что схема потокового DataFrame (df
) выглядит следующим образом:
root
|-- country: string (nullable = true)
|-- plan: string (nullable = true)
|-- value: string (nullable = true)
Я также предполагаю, что вы хотите записать ( произвести ) все строки в потоковом фрейме данных (df
) в тему Кафки как одну запись, в которой строки находятся в форме массив JSON.
Если это так, вам следует groupBy
строк и collect_list
, чтобы сгруппировать все строки в одну, которую вы могли бы записать в Kafka.
// df is a batch DataFrame so I could show for demo purposes
scala> df.show
+-------+--------+-----+
|country| plan|value|
+-------+--------+-----+
| US|postpaid| 300|
| CAN| 0.0| 30|
+-------+--------+-----+
val jsons = df.selectExpr("to_json(struct(*)) AS value")
scala> jsons.show(truncate = false)
+------------------------------------------------+
|value |
+------------------------------------------------+
|{"country":"US","plan":"postpaid","value":"300"}|
|{"country":"CAN","plan":"0.0","value":"30"} |
+------------------------------------------------+
val grouped = jsons.groupBy().agg(collect_list("value") as "value")
scala> grouped.show(truncate = false)
+-----------------------------------------------------------------------------------------------+
|value |
+-----------------------------------------------------------------------------------------------+
|[{"country":"US","plan":"postpaid","value":"300"}, {"country":"CAN","plan":"0.0","value":"30"}]|
+-----------------------------------------------------------------------------------------------+
Я бы сделал все вышеперечисленное в DataStreamWriter.foreachBatch , чтобы получить DataFrame для работы.