Я получаю данные от kafka, используя потоковую передачу pyspark, и в результате получается датафрейм, когда я преобразую фрейм данных в rdd, он ошибся:
Traceback (most recent call last):
File "/home/docs/dp_model/dp_algo_platform/dp_algo_core/test/test.py", line 36, in <module>
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 91, in rdd
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'
правильный код версии:
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.load()
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df = df.withColumn("s", F.split(df['value'], " "))
df = df.withColumn('e', F.explode(df['s']))
# df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
q = df.writeStream \
.format("console") \
.trigger(processingTime='30 seconds') \
.start()
q.awaitTermination()
это неправильный код версии:
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.load()
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# df = df.withColumn("s", F.split(df['value'], " "))
# df = df.withColumn('e', F.explode(df['s']))
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
q = df.writeStream \
.format("console") \
.trigger(processingTime='30 seconds') \
.start()
q.awaitTermination()
Почему он не может преобразовать фрейм данных в rdd? и как я могу сделать, когда я хочу преобразовать dataframe в rdd в потоковой передаче pyspark?