Экспорт данных из Pyspark Dataframe в словарь или список для дальнейшей обработки Python - PullRequest
0 голосов
/ 06 мая 2019

Я пытаюсь извлечь значения из Фрейма данных Pyspark после того, как Pyspark действительно работает, чтобы найти Подключенные компоненты, но я не понимаю, как извлечь эти данные, как вы, скажем, из списка.

Нижеупрощенная версия таблицы, созданной из большого набора данных, из которого я работаю.По сути, следующая таблица создается с использованием данных о связности вершин и ребер графов.Если номер компонента совпадает, это означает, что узлы (идентификаторы) лежат в одной и той же структуре графа.


    +---+------------+
    | id|   component|
    +---+------------+
    |  0|154618822656|
    |  1|154618822656|
    |  2|154618822656|
    |  3|154618822656|
    |  4|420906795008|
    |  5|420906795008|
    +---+------------+

Я много чего пытался извлечь данные в формы, которые я больше всегораньше нравились списки и словари.Когда я пробую различные методы в документах, я получаю вывод, такой как:

[Row(id='0', component=154618822656), Row(id='1', component=154618822656)]

, который я не уверен, как работать.Я также видел метод asDict () в Pyspark, но не могу заставить его работать даже с простой таблицей.

Это примерная функция, которая принимает графическую рамку, находит связанные компоненты и создает таблицу.Все хорошо, пока я не хочу поместить данные в другую структуру:

def get_connected_components(graphframe):
    connected_table = g.connectedComponents()
    connected_table.collect()
    conn = connected_table.rdd.take(2)
    print(conn)

В конечном итоге я хотел бы получить что-то вроде этого:

{"154618822656" : {0, 1}, "420906795008": {2, 3, 4, 5}}

, которое я бы превратил в дальнейшеенапример:

0 1
2 3 4 5

Это может быть неправильный способ работы с этими таблицами, но я новичок в Pyspark и удивлен тем, насколько это сложно даже со всеми поисками.Заранее спасибо.

1 Ответ

0 голосов
/ 06 мая 2019

Не совсем уверен, что вы пытаетесь сделать, но вот некоторые методы преобразования словаря и списка через Spark, которые должны помочь.Важно отметить, что если вы хотите использовать такие структуры, как list / dict, я предлагаю работать на одной машине (если ваш набор данных может поместиться в памяти), а не пытаться распределять вычисления через Spark только для сбора всех данных обратно.на одну машину, чтобы сделать больше обработки.Также есть несколько хороших Python-графовых пакетов, так как вы работаете с Spark GraphFrames.Надеюсь, это поможет.

# load your sample data set
data = [(0, 154618822656),\
        (1, 154618822656),\
        (2, 154618822656),\
        (3, 154618822656),\
        (4, 420906795008),\
        (5, 420906795008),]

df = spark.createDataFrame(data, ("id", "comp"))

df.show()

+---+------------+
| id|        comp|
+---+------------+
|  0|154618822656|
|  1|154618822656|
|  2|154618822656|
|  3|154618822656|
|  4|420906795008|
|  5|420906795008|
+---+------------+

# get desired format like {"154618822656" : {0, 1}, "420906795008": {2, 3, 4, 5}} from your post
from pyspark.sql.functions import collect_list

df.groupBy("comp").agg(collect_list("id").alias("id")).show()
+------------+------------+
|        comp|          id|
+------------+------------+
|154618822656|[0, 1, 2, 3]|
|420906795008|      [4, 5]|
+------------+------------+

# you can convert col to a list ***collect() is not recommended for larger datasets***
l = [i for i in df.select("id").rdd.flatMap(lambda x: x).collect()]

print(type(l))
print(l)
<class 'list'>
[0, 1, 2, 3, 4, 5]

# write to json so you can get a dictionary format like you were mentioning
df.groupBy("comp").agg(collect_list("id").alias("id")).write.json("data.json")

! cat data.json/*.json
{"comp":154618822656,"id":[0,1,2,3]}
{"comp":420906795008,"id":[4,5]}
...