Эффективное создание нового кадра данных из столбца данных pyspark - PullRequest
0 голосов
/ 26 июня 2019

Интересно, как наиболее эффективно извлечь столбец в фрейме данных pyspark и превратить его в новый фрейм данных?Следующий код работает без проблем с небольшими наборами данных, но работает очень медленно и даже вызывает ошибку нехватки памяти.Интересно, как я могу улучшить эффективность этого кода?

pdf_edges = sdf_grp.rdd.flatMap(lambda x: x).collect()  
edgelist = reduce(lambda a, b: a + b, pdf_edges, [])
sdf_edges = spark.createDataFrame(edgelist)

В фрейме данных pyspark sdf_grp , столбец "пар" содержит информацию, как показано ниже

+-------------------------------------------------------------------+
|pairs                                                              |
+-------------------------------------------------------------------+
|[[39169813, 24907492], [39169813, 19650174]]                       |
|[[10876191, 139604770]]                                            |
|[[6481958, 22689674]]                                              |
|[[73450939, 114203936], [73450939, 21226555], [73450939, 24367554]]|
|[[66306616, 32911686], [66306616, 19319140], [66306616, 48712544]] |
+-------------------------------------------------------------------+

со схемой

root
|-- pairs: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- node1: integer (nullable = false)
|    |    |-- node2: integer (nullable = false)

Я хотел бы преобразовать их в новый фрейм данных sdf_edges , как показано ниже

+---------+---------+
|    node1|    node2|
+---------+---------+
| 39169813| 24907492|
| 39169813| 19650174|
| 10876191|139604770|
|  6481958| 22689674|
| 73450939|114203936|
| 73450939| 21226555|
| 73450939| 24367554|
| 66306616| 32911686|
| 66306616| 19319140|
| 66306616| 48712544|
+---------+---------+

Ответы [ 2 ]

1 голос
/ 26 июня 2019

Самый эффективный способ извлечения столбцов - избегать collect () .Когда вы вызываете collect (), все данные передаются драйверу и обрабатываются там.Лучший способ достичь желаемого - использовать функцию explode () .Взгляните на пример ниже:

from pyspark.sql import types as T
import pyspark.sql.functions as F

schema = T.StructType([
  T.StructField("pairs", T.ArrayType(
      T.StructType([
          T.StructField("node1", T.IntegerType()),
          T.StructField("node2", T.IntegerType())
      ])
   )
   )
])


df = spark.createDataFrame(
[
([[39169813, 24907492], [39169813, 19650174]],),
([[10876191, 139604770]],        )                                    ,
([[6481958, 22689674]]      ,     )                                   ,
([[73450939, 114203936], [73450939, 21226555], [73450939, 24367554]],),
([[66306616, 32911686], [66306616, 19319140], [66306616, 48712544]],)
], schema)

df = df.select(F.explode('pairs').alias('exploded')).select('exploded.node1', 'exploded.node2')
df.show(truncate=False)

Вывод:

+--------+---------+ 
|  node1 |   node2 | 
+--------+---------+ 
|39169813|24907492 | 
|39169813|19650174 | 
|10876191|139604770| 
|6481958 |22689674 | 
|73450939|114203936| 
|73450939|21226555 | 
|73450939|24367554 | 
|66306616|32911686 | 
|66306616|19319140 | 
|66306616|48712544 | 
+--------+---------+ 
0 голосов
/ 26 июня 2019

Ну, я просто решаю это с помощью ниже

sdf_edges = sdf_grp.select('pairs').rdd.flatMap(lambda x: x[0]).toDF()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...