Как записать несколько столбцов в спарке dataframe в очередь kafka - PullRequest
0 голосов
/ 27 мая 2019

Я знаю, что мы можем интегрировать spark с kafka и записать кадр данных в формате ключа и значения в очередь kafka, как показано ниже

df - датафрейм

 df.withColumnRenamed("Column_1", "key")
 .withColumnRenamed("Column_2", "value")
 .write()
 .format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .save()

Но как мне записать столбец 3,4,5 и многие в очередь kafka? Как я могу записать всю строку в очередь kafka за один раз?

Любые предложения приветствуются

1 Ответ

0 голосов
/ 28 мая 2019

Кафка получает только (ключ, значение) сформированные сообщения.Таким образом, вы должны объединить столбцы в одно значение (например, JSON).Вот пример

Это должно работать: (Построить соответствующий value_fields)

import org.apache.spark.sql.functions._

val value_fields = df.columns.filter(_ != "Column_1") 

df
.withColumnRenamed("Column_1", "key")
.withColumn("value", to_json(struct(value_fields.map(col(_)):_*)))
.select("key", "value")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
...