Перестановка строк в кадре данных Spark - PullRequest
0 голосов
/ 29 июня 2019

Я пытаюсь создать новый столбец во фрейме данных, который является просто перетасованной версией существующего столбца.Я могу случайным образом упорядочить строки во фрейме данных, используя метод, описанный в Как перетасовать строки в фрейме данных Spark? , но когда я пытаюсь добавить перетасованную версию столбца к даннымфрейм, как представляется, не выполняет перетасовку.

import pyspark
import pyspark.sql.functions as F

spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.range(5).toDF("x")
df.show()
#> +---+
#> |  x|
#> +---+
#> |  0|
#> |  1|
#> |  2|
#> |  3|
#> |  4|
#> +---+

# the rows appear to be shuffled
ordered_df = df.orderBy(F.rand())
ordered_df.show()
#> +---+
#> |  x|
#> +---+
#> |  0|
#> |  2|
#> |  3|
#> |  4|
#> |  1|
#> +---+

# ...but when i try to add this column to the df, they are no longer shuffled
df.withColumn('y', ordered_df.x).show()
#> +---+---+
#> |  x|  y|
#> +---+---+
#> |  0|  0|
#> |  1|  1|
#> |  2|  2|
#> |  3|  3|
#> |  4|  4|
#> +---+---+

Создано в 2019-06-28 пакетом prexpy

Несколько замечаний:

  • Я хотел бы найти решение, где данные остаются в Spark.Например, я не хочу использовать пользовательскую функцию, требующую перемещения данных из JVM.
  • Решение в PySpark: рандомизировать строки в фрейме данных у меня не сработало (см. ниже).

df = spark.sparkContext.parallelize(range(5)).map(lambda x: (x, )).toDF(["x"])

df.withColumn('y', df.orderBy(F.rand()).x).show()
#> +---+---+
#> |  x|  y|
#> +---+---+
#> |  0|  0|
#> |  1|  1|
#> |  2|  2|
#> |  3|  3|
#> |  4|  4|
#> +---+---+
  • Я должен перетасовать строки во многих столбцах, и каждый столбец должен перемешиваться независимоиз других.Поэтому я бы предпочел не использовать решение zipWithIndex() в https://stackoverflow.com/a/45889539,, так как это решение потребовало бы, чтобы я выполнял много соединений с данными (что, я предполагаю, потребует много времени).

1 Ответ

1 голос
/ 29 июня 2019

Это можно сделать с помощью оконных функций, чтобы назначить каждой строке случайный индекс, повторив это в отдельном DF, а затем присоединившись к индексу:

>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as F
>>> df = spark.range(5).toDF("x")
>>> left = df.withColumn("rnd", F.row_number().over(Window.orderBy(F.rand())))
>>> right = df.withColumnRenamed("x", "y").withColumn("rnd", F.row_number().over(Window.orderBy(F.rand()))) 
>>> dff = left.join(right, left.rnd == right.rnd).drop("rnd")
>>> dff.show()
19/06/29 13:17:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
19/06/29 13:17:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---+---+                                                                       
|  x|  y|
+---+---+
|  3|  3|
|  2|  0|
|  0|  2|
|  1|  1|
|  4|  4|
+---+---+

Как следует из предупреждения, на практике это может быть не очень хорошей идеей.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...