Как преобразовать столбец spark Streaming dataframe в список Python - PullRequest
0 голосов
/ 21 ноября 2019

У меня есть поток данных искрового потока, как показано ниже. Я хочу преобразовать

+----------------+-------+-----------+-----------------+
|application_name|     id|syntheticid|          Journey|
+----------------+-------+-----------+-----------------+
|            test|   24  |   12392234|      Activation |
|            test|   24  |   12392234|          LOAD   |
+----------------+-------+-----------+-----------------+
  1. Как преобразовать это в обычный фрейм данных?

  2. Как преобразовать столбец потокового фрейма данных всписок? Например, я хочу преобразовать путешествие по столбцу в список Python ['Activation','Load'].

Любая помощь будет оценена.

1 Ответ

0 голосов
/ 21 ноября 2019

Что касается вашего первого пункта, вы не задаете правильный вопрос. Начиная с Spark 2.0, API-интерфейсы в основном перекрываются, поэтому Spark Streaming DataFrame по сути то же самое, что и Spark (SQL) DataFrame, хотя Spark Streaming DataFrame не ограничен.

Начиная с Spark 2.0, DataFrames и Datasets могут представлять статические ограниченные данные, а также потоковые неограниченные данные.

Следовательно, вы должны иметь возможность выполнять большинствоВаши необходимые манипуляции с вашим (потоковым) DataFrame.

С точки зрения вашего второго момента, попробуйте взглянуть на функции агрегирования, такие как collect_list() и collect_set(). Попробуйте этот код:

from pyspark.sql import SparkSession
from pyspark.sql import functions as f

>>> spark = SparkSession.builder.getOrCreate()
>>> df = spark._sc.parallelize([
 ["test","24","12392234","Activation"], 
 ["test","24","12392234","Load"]]
).toDF(["application_name","id","syntheticid","journey"])

>>> df.show()
+----------------+---+-----------+----------+
|application_name| id|syntheticid|   journey|
+----------------+---+-----------+----------+
|            test| 24|   12392234|Activation|
|            test| 24|   12392234|      Load|
+----------------+---+-----------+----------+


>>> grouped_df = df.groupBy('application_name').agg(f.collect_list('journey').alias('collection'))
>>> grouped_df.show()
+----------------+------------------+
|application_name|        collection|
+----------------+------------------+
|            test|[Activation, Load]|
+----------------+------------------+


>>> python_list = [item for sublist in [row.collection for row in grouped_df.collect()] for item in sublist]

>>> python_list
['Activation', 'Load']
...