Что касается вашего первого пункта, вы не задаете правильный вопрос. Начиная с Spark 2.0, API-интерфейсы в основном перекрываются, поэтому Spark Streaming DataFrame по сути то же самое, что и Spark (SQL) DataFrame, хотя Spark Streaming DataFrame не ограничен.
Начиная с Spark 2.0, DataFrames и Datasets могут представлять статические ограниченные данные, а также потоковые неограниченные данные.
Следовательно, вы должны иметь возможность выполнять большинствоВаши необходимые манипуляции с вашим (потоковым) DataFrame.
С точки зрения вашего второго момента, попробуйте взглянуть на функции агрегирования, такие как collect_list()
и collect_set()
. Попробуйте этот код:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
>>> spark = SparkSession.builder.getOrCreate()
>>> df = spark._sc.parallelize([
["test","24","12392234","Activation"],
["test","24","12392234","Load"]]
).toDF(["application_name","id","syntheticid","journey"])
>>> df.show()
+----------------+---+-----------+----------+
|application_name| id|syntheticid| journey|
+----------------+---+-----------+----------+
| test| 24| 12392234|Activation|
| test| 24| 12392234| Load|
+----------------+---+-----------+----------+
>>> grouped_df = df.groupBy('application_name').agg(f.collect_list('journey').alias('collection'))
>>> grouped_df.show()
+----------------+------------------+
|application_name| collection|
+----------------+------------------+
| test|[Activation, Load]|
+----------------+------------------+
>>> python_list = [item for sublist in [row.collection for row in grouped_df.collect()] for item in sublist]
>>> python_list
['Activation', 'Load']