Как только вы пишете collectAsList();
, вы больше не используете Spark, просто необработанный Kafka Java API.
Я бы предложил использовать Spark Structured Streaming Integration Kafka , и вы можете сделать
Вот пример, и вам нужно сформировать DataFrame как минимум с двумя столбцамипотому что Кафка берет ключи и ценности.
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic_name")
.save()
Что касается получения данных в JSON, опять же, collectToList()
неверно.Не тяните данные в один узел.
Вы можете использовать data.map()
для преобразования набора данных из одного формата в другой.
Например, вы должны отобразить строку в строку в формате JSON.
row -> "{\"f0\":" + row.get(0) + "}"