Проблема сводится к следующему: я хочу сгенерировать DataFrame в pyspark, используя существующий распараллеленный набор входных данных, и функция, которая при одном входе может генерировать относительно большой пакет строк. В приведенном ниже примере я хочу сгенерировать фрейм данных 10 ^ 12 строк, используя, например, 1000 исполнителей:
def generate_data(one_integer):
import numpy as np
from pyspark.sql import Row
M = 10000000 # number of values to generate per seed, e.g. 10M
np.random.seed(one_integer)
np_array = np.random.random_sample(M) # generates an array of M random values
row_type = Row("seed", "n", "x")
return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]
N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
StructField("seed", IntegerType()),
StructField("n", IntegerType()),
StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)
(я действительно не хочу изучать распределение случайных чисел с заданным семенем - это просто пример, которым я был смог придумать, чтобы проиллюстрировать ситуацию, когда большой фрейм данных не загружается со склада, а генерируется кодом)
Приведенный выше код делает в значительной степени именно то, что я хочу. Проблема в том, что он делает это очень неэффективно - за счет создания объекта python Row для каждой строки, а затем преобразования объектов python Row во внутреннее столбцовое представление Spark.
Есть ли способ Я могу преобразовать пакет строк уже в столбцовом представлении (например, один или несколько массивов numpy, как указано выше np_array
), просто сообщив Spark, что это столбцы пакета значений?
Например, я может написать код для создания RDD коллекции python, где каждый элемент представляет собой pyarrow.RecordBatch или pandas .DataFrame, но я не могу найти способ преобразовать любой из них в Spark DataFrame без создания RDD строки pyspark объектов в процессе.
Существует по крайней мере дюжина статей с примерами того, как я могу использовать pyarrow + pandas для эффективного преобразования локального (в драйвер) pandas фрейма данных в фрейм данных Spark, но это не вариант для меня, потому что мне нужно, чтобы данные были фактически сгенерированы распределенным способом по исполнителям, а не генерировались pandas dataframe на драйвере и отправка его исполнителям.
UPD. Я нашел один способ избежать создания объектов Row - используя RDD python кортежей. Как и ожидалось, это все еще слишком медленно, но все же немного быстрее, чем при использовании объектов Row. Тем не менее, это не совсем то, что я ищу (это действительно эффективный способ передачи столбчатых данных в Spark из python).
Также измерено время для выполнения определенных операций на машине (грубый способ с довольно небольшими вариациями в измеренном времени, но, на мой взгляд, он репрезентативен): рассматриваемый набор данных составляет 10M строк, 3 столбца (один столбец является постоянным целым числом, другой - целочисленным диапазоном от 0 до 10M-1, третий - плавающим значение точки, сгенерированное с использованием np.random.random_sample
:
- Локально сгенерировать pandas фрейм данных (10M строк): ~ 440-450 мс
- Локально создать python список искр. sql .Row объекты (10 млн строк): ~ 12-15 с
- Локально генерировать python список кортежей, представляющих строки (10 млн строк): ~ 3,4-3,5 с
Создать фрейм данных Spark с использованием всего 1 исполнителя и 1 начального начального значения:
- с использованием
spark.createDataFrame(row_rdd, schema=my_schema)
: ~ 70-80 с - с использованием
spark.createDataFrame(tuple_rdd, schema=my_schema)
: ~ 40-45 с - ( нераспределенное создание) с использованием
spark.createDataFrame(pandas_df, schema=my_schema)
: ~ 0,4-0,5 с (без самой генерации pandas df, которая занимает примерно такое же время) - с spark.sql.execution.arrow.enabled
, установленным в значение true.
Пример с локальным для драйвера pandas фрейм данных, преобразованный в фрейм данных Spark за ~ 1 с для 10M строк, дает мне повод для полагаю, то же самое должно быть возможно с фреймами данных, созданными в исполнителях. Однако самое быстрое, что я могу достичь сейчас, составляет ~ 40 секунд для 10M строк с использованием RDD из python кортежей.
Итак, вопрос все еще остается - есть ли способ эффективно сгенерировать большой фрейм данных Spark распределенным образом в pyspark?