Как создать фрейм данных для потоковой передачи кафки с помощью PySpark? - PullRequest
0 голосов
/ 21 мая 2018

Я использую kafka версии 2.11-1.0.1 и Spark версии 2.0.2.Я должен сделать dataframe для ответа kafka.Так как я могу сделать dataframe для kafkaStream?Заранее спасибо

1 Ответ

0 голосов
/ 21 мая 2018

Как вы сказали,

kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer3", {topic: 1})
lines = kvs.map(lambda x: x[1])

Здесь lines - это dStream из rdds, а не одно rdd само по себе.Следовательно, чтобы получить фрейм данных, вы должны преобразовать его в dStream фреймов данных.Как то так,

lines.foreachRDD(lambda rdd: rdd.toDF())
...