Как преобразовать кадры данных в RDDS в структурированном потоке? - PullRequest
0 голосов
/ 06 января 2020

Я получаю данные от kafka, используя потоковую передачу pyspark, и в результате получается датафрейм, когда я преобразую фрейм данных в rdd, он ошибся:

Traceback (most recent call last):
File "/home/docs/dp_model/dp_algo_platform/dp_algo_core/test/test.py", line 36, in <module>
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 91, in rdd
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

правильный код версии:

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df = df.withColumn("s", F.split(df['value'], " "))
df = df.withColumn('e', F.explode(df['s']))
# df = df.rdd.map(lambda x: x.value.split(" ")).toDF()

q = df.writeStream \
    .format("console") \
    .trigger(processingTime='30 seconds') \
    .start()

q.awaitTermination()

это неправильный код версии:

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# df = df.withColumn("s", F.split(df['value'], " "))
# df = df.withColumn('e', F.explode(df['s']))
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()

q = df.writeStream \
    .format("console") \
    .trigger(processingTime='30 seconds') \
    .start()

q.awaitTermination()

Почему он не может преобразовать фрейм данных в rdd? и как я могу сделать, когда я хочу преобразовать dataframe в rdd в потоковой передаче pyspark?

Ответы [ 4 ]

1 голос
/ 06 января 2020

Этот аспект RDD просто НЕ поддерживается. RDD являются устаревшими, а Spark структурированная потоковая передача основана на DF / DS. Общая абстракция, потоковая или пакетная.

0 голосов
/ 09 апреля 2020

Если у вас версия spark 2.4.0 и выше, вы можете использовать альтернативу ниже, чтобы поиграться с каждой строкой вашего фрейма данных.

query=df.writeStream.foreach(Customized method to work on each row of dataframe rather than RDD).outputMode("update").start()
    ssc.start()
    ssc.awaitTermination()
0 голосов
/ 07 января 2020

Для выполнения определенных c действий над полями Dataframe вы можете использовать функции UDF или даже создавать собственные Spark Custom Transformers. Но есть некоторые операции с Dataframe, которые не поддерживаются, например, преобразование в RDD.

0 голосов
/ 07 января 2020
На движке spark- sql работает структурированная потоковая передача

. Преобразование кадра данных или набора данных в RDD не поддерживается.

...