Эффективное создание большого DataFrame распределенным образом в pyspark (без pyspark. sql .Row) - PullRequest
2 голосов
/ 25 мая 2020

Проблема сводится к следующему: я хочу сгенерировать DataFrame в pyspark, используя существующий распараллеленный набор входных данных, и функция, которая при одном входе может генерировать относительно большой пакет строк. В приведенном ниже примере я хочу сгенерировать фрейм данных 10 ^ 12 строк, используя, например, 1000 исполнителей:

def generate_data(one_integer):
  import numpy as np
  from pyspark.sql import Row
  M = 10000000 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  row_type = Row("seed", "n", "x")
  return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]

N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
       StructField("seed", IntegerType()),
       StructField("n", IntegerType()),
       StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)

(я действительно не хочу изучать распределение случайных чисел с заданным семенем - это просто пример, которым я был смог придумать, чтобы проиллюстрировать ситуацию, когда большой фрейм данных не загружается со склада, а генерируется кодом)

Приведенный выше код делает в значительной степени именно то, что я хочу. Проблема в том, что он делает это очень неэффективно - за счет создания объекта python Row для каждой строки, а затем преобразования объектов python Row во внутреннее столбцовое представление Spark.

Есть ли способ Я могу преобразовать пакет строк уже в столбцовом представлении (например, один или несколько массивов numpy, как указано выше np_array), просто сообщив Spark, что это столбцы пакета значений?

Например, я может написать код для создания RDD коллекции python, где каждый элемент представляет собой pyarrow.RecordBatch или pandas .DataFrame, но я не могу найти способ преобразовать любой из них в Spark DataFrame без создания RDD строки pyspark объектов в процессе.

Существует по крайней мере дюжина статей с примерами того, как я могу использовать pyarrow + pandas для эффективного преобразования локального (в драйвер) pandas фрейма данных в фрейм данных Spark, но это не вариант для меня, потому что мне нужно, чтобы данные были фактически сгенерированы распределенным способом по исполнителям, а не генерировались pandas dataframe на драйвере и отправка его исполнителям.

UPD. Я нашел один способ избежать создания объектов Row - используя RDD python кортежей. Как и ожидалось, это все еще слишком медленно, но все же немного быстрее, чем при использовании объектов Row. Тем не менее, это не совсем то, что я ищу (это действительно эффективный способ передачи столбчатых данных в Spark из python).

Также измерено время для выполнения определенных операций на машине (грубый способ с довольно небольшими вариациями в измеренном времени, но, на мой взгляд, он репрезентативен): рассматриваемый набор данных составляет 10M строк, 3 столбца (один столбец является постоянным целым числом, другой - целочисленным диапазоном от 0 до 10M-1, третий - плавающим значение точки, сгенерированное с использованием np.random.random_sample:

  • Локально сгенерировать pandas фрейм данных (10M строк): ~ 440-450 мс
  • Локально создать python список искр. sql .Row объекты (10 млн строк): ~ 12-15 с
  • Локально генерировать python список кортежей, представляющих строки (10 млн строк): ~ 3,4-3,5 с

Создать фрейм данных Spark с использованием всего 1 исполнителя и 1 начального начального значения:

  • с использованием spark.createDataFrame(row_rdd, schema=my_schema): ~ 70-80 с
  • с использованием spark.createDataFrame(tuple_rdd, schema=my_schema): ~ 40-45 с
  • ( нераспределенное создание) с использованием spark.createDataFrame(pandas_df, schema=my_schema): ~ 0,4-0,5 с (без самой генерации pandas df, которая занимает примерно такое же время) - с spark.sql.execution.arrow.enabled, установленным в значение true.

Пример с локальным для драйвера pandas фрейм данных, преобразованный в фрейм данных Spark за ~ 1 с для 10M строк, дает мне повод для полагаю, то же самое должно быть возможно с фреймами данных, созданными в исполнителях. Однако самое быстрое, что я могу достичь сейчас, составляет ~ 40 секунд для 10M строк с использованием RDD из python кортежей.

Итак, вопрос все еще остается - есть ли способ эффективно сгенерировать большой фрейм данных Spark распределенным образом в pyspark?

Ответы [ 3 ]

1 голос
/ 01 июня 2020

Похоже, узким местом является преобразование из RDD -> Dataframes, и что данная функция довольно быстрая, и что pandas преобразования DF в Spark DF через pyarrow выполняются довольно быстро. Вот два возможных решения:

  1. Поскольку параллельно легко создать pandas df, вместо того, чтобы возвращать его из исполнителя, напишите полученный df, используя df.to_parquet, ie:
def generate_data(seed):
    M = 10
    np.random.seed(seed)
    np_array = np.random.random_sample(M) # generates an array of M random values
    df = pd.DataFrame(np_array, columns=["x"])
    df["seed"] = seed
    df.reset_index().to_parquet(f"s3://bucket/part-{str(seed).zfill(5)}.parquet"

Чтение искр в полученных паркетных файлах впоследствии должно быть тривиальным. Тогда вашим узким местом станут ограничения ввода-вывода, которые должны быть быстрее, чем искровое преобразование кортежей / типов строк.

Если вам не разрешено сохранять что-либо в файл, pandas_udf и GROUPED_MAP могут помочь вам, если ваша версия Spark достаточно свежая. Он также использует pyarrow для преобразования между искровыми DF и pandas DF, поэтому он должен быть быстрее, чем использование кортежей, и позволяет вам создавать и возвращать pandas DF из вашего UDF распределенным образом.
import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

N = 10

df = spark.createDataFrame(
    [(i,) for i in range(N)], ["seed"]
)

def generate_data(seed):
    M = 10
    np.random.seed(seed)
    np_array = np.random.random_sample(M) # generates an array of M random values
    df = pd.DataFrame(np_array, columns=["x"])
    df["seed"] = seed
    return df.reset_index()

@pandas_udf("index long, x double, seed long", PandasUDFType.GROUPED_MAP)
def generate_data_udf(pdf):
    output = []
    for idx, row in pdf.iterrows():
        output.append(generate_data(row["seed"]))
    return pd.concat(output)


df.groupby("seed").apply(generate_data_udf).show()

Более медленной частью будет groupby, который вы можете ускорить в зависимости от того, как вы дозируете семя, идущее в generate_data_udf, ie:

@udf(returnType=IntegerType())
def batch_seed(seed):
    return seed // 10

df.withColumn("batch_seed", batch_seed(col("seed"))). \
groupBy("batch_seed").apply(generate_data_udf).show()
0 голосов
/ 01 июня 2020

вот решение, которое не использует RDD или не создает строки, а только с операцией dataframe:
(код находится в scala, но делать то же самое в python должно быть просто)

val N = 100000

//for seed return array of index and random_value
def generate_data(i: Int): Array[(Int, Double)] = ???
val generate_data_udf = udf (generate_data _)

spark
  .range(N)
  .toDF("seed")
  .withColumn("arr", generate_data_udf($"seed"))
  .select(
    $"seed",
    explode($"arr") as "exp"
  )
  .select(
    $"seed",
    $"exp._1" as "n",
    $"exp._2" as "x"
  )
0 голосов
/ 30 мая 2020

Вот решение вашей проблемы без использования Row - только на основе RDD. Я думаю, что это может быть наиболее эффективным способом, поскольку он использует map для вычисления выходных данных вашей функции и flatMap для объединения этих выходных данных - обе эти операции выполняются на RDD, поэтому все должно быть распределено.

import numpy as np
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext

def generate_data(one_integer):
  M = 2 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  return [(one_integer, i, float(np_array[i])) for i in range(M)]

N = 30 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = sc.parallelize(list_of_integers)
generated_data_rdd = list_of_integers_rdd.map(lambda x: generate_data(x))
solved_rdd = generated_data_rdd.flatMap(lambda list: list)

df = spark.createDataFrame(solved_rdd).toDF("seed", "n", "x")
df.show()
...