Распараллелить функцию сбора искр - PullRequest
0 голосов
/ 19 сентября 2019

Я заметил, что функция spark, collect, чрезвычайно медленная на больших наборах данных, поэтому я пытаюсь исправить это с помощью распараллеливания.

Мой основной метод создает сеанс spark и передает егоget_data func.

def main():
    spark = SparkSession.builder.appName('app_name').getOrCreate()
    return get_data(spark)

Вот где я пытаюсь распараллелить мою функцию сбора

def get_data(spark):
    df = all_data(spark)
    data = spark.sparkContext.parallelize(df.select('my_column').distinct().collect())
    return map(lambda row: row['my_column'], data)

Это не работает и возвращает эту ошибку:

TypeError: объект 'RDD' не повторяется

Есть ли у кого-нибудь идеи о том, как распараллелить или увеличить производительность функции get_data.

1 Ответ

2 голосов
/ 20 сентября 2019

Здесь приведены примеры статических и динамических подходов с использованием переменной broadcast ( переменная только для чтения сохраняется в каждой памяти исполнителя; избегает передачи копии спискана машине-драйвере для каждой распределенной задачи ), чтобы получить отдельные значения столбца.Кроме того, если вы не предоставите жестко запрограммированное значение во время pivot, оно вызовет дополнительное задание (широкое преобразование в случайном порядке), чтобы получить различные значения для этого столбца.

Отказ от ответственности => может быть лучшая альтернатива для динамического подхода с точки зрения производительности

print(spark.version)
2.4.3

import pyspark.sql.functions as F

# sample data
rawData = [(1, "a"),
           (1, "b"),
           (1, "c"),
           (2, "a"),
           (2, "b"),
           (2, "c"),
           (3, "a"),
           (3, "b"),
           (3, "c")]

df = spark.createDataFrame(rawData).toDF("id","value")

# static list example
l = ["a", "b", "c"]
l = spark.sparkContext.broadcast(l)

pivot_static_df = df\
  .groupby("id")\
  .pivot("value", l.value)\
  .agg(F.expr("first(value)"))

pivot_static_df.show()
+---+---+---+---+
| id|  a|  b|  c|
+---+---+---+---+
|  1|  a|  b|  c|
|  3|  a|  b|  c|
|  2|  a|  b|  c|
+---+---+---+---+

# dynamic list example
v = df.select("value").distinct().rdd.flatMap(lambda x: x).collect()
v = spark.sparkContext.broadcast(v)

print(v.value)

pivot_dynamic_df = df\
  .groupby("id")\
  .pivot("value", l.value)\
  .agg(F.expr("first(value)"))

pivot_dynamic_df.show()
['c', 'b', 'a']
+---+---+---+---+
| id|  a|  b|  c|
+---+---+---+---+
|  1|  a|  b|  c|
|  3|  a|  b|  c|
|  2|  a|  b|  c|
+---+---+---+---+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...