записать фреймворк pyspark в kafka - PullRequest
1 голос
/ 17 июня 2020

У меня есть кадр данных pyspark, который я хотел записать в kafka topi c.

df.show(n=5)
+-------+---------+
|county |category |
+-------+---------+
|Albany1|  Animal3|
|Albany2|  Animal5|
|Albany3|  Animal1|
|Albany4|  Animal2|
|Albany5|  Animal4|
+-------+---------+

df.printSchema ()

root
 |-- county: string (nullable = true)
 |-- category: string (nullable = true)

код, который я пробовал

df.selectExpr("to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host:port") \
.option("topic", "test") \
.save()

Я получаю ошибку ниже.

 java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider does not allow create table as select.

Пожалуйста, помогите. Спасибо !!

...