Почему задача PySpark занимает слишком много времени? - PullRequest
0 голосов
/ 05 апреля 2020

Я запускаю процесс Pyspark, который работает без проблем. Первым шагом процесса является применение указанного c UDF к кадру данных. Это функция:

import html2text

class Udfs(object):
    def __init__(self):
        self.h2t = html2text.HTML2Text()
        self.h2t.ignore_links = True
        self.h2t.ignore_images = True

    def extract_text(self, raw_text):
        try:
            texto = self.h2t.handle(raw_text)
        except:
            texto = "PARSE HTML ERROR"
        return texto

Вот как я применяю UDF:

import pyspark.sql.functions as f
import pyspark.sql.types as t
from udfs import Udfs

udfs = Udfs()
extract_text_udf = f.udf(udfs.extract_text, t.StringType())
df = df.withColumn("texto", extract_text_udf("html_raw"))

Он обрабатывает приблизительно 29 миллионов строк и 300 ГБ. Проблема в том, что для выполнения некоторых задач требуется слишком много времени. Среднее время выполнения задач:

average times

Другие задачи выполнены с продолжительностью более 1 часа.

Но некоторые задачи обработка занимает слишком много времени:

task time

Процесс выполняется в AWS с EMR в кластере с 100 узлами, каждый узел с 32 ГБ ОЗУ и 4 процессора. Также включается спекуляция.

Где проблема с этими задачами? Это проблема с UDF? Это проблема с нитками?

Ответы [ 2 ]

4 голосов
/ 11 апреля 2020

Моя интуиция заключается в том, что вы используете слишком много разделов. Я бы сделал первую попытку, значительно сократив их количество. Вы можете найти этот интересный пост на эту тему.

Если ваши разделы сбалансированы, у вас есть 29 millions /80k partitions = 362 наблюдений в среднем по разделам. Я полагаю, этого недостаточно. Вы тратите много времени на планирование задач, а не на их выполнение.

Ситуация становится хуже, если у вас нет сбалансированных разделов (см. здесь . Это обычно создает узкие места, что и является в вашем случае, похоже, происходит. Есть несколько вариантов:

  • Вы можете coalesce свои данные на меньшее количество разделов. Это лучше, чем использовать repartition, потому что это позволяет избежать полных перемешиваний
  • repartitionByRange если вы хотите, чтобы ваши данные были разделены на несколько столбцов, у вас не будет таких сбалансированных разделов, как с coalesce или repartition, но это может быть полезно с последними, если вам нужно использовать операции с этими разделительными столбцами

Вы можете изменить значения по умолчанию для раздела с помощью spark.sql.shuffle.partitions и spark.default.parallelism.

По моему опыту это предположение. Поиск подходящего числа раздел сложный, но оно того стоит. Дайте мне знать, если это помогло или у вас все еще есть узкие места.

0 голосов
/ 13 апреля 2020

Я нашел решение, используя repartitionByRange в первом кадре данных. При присвоении правильного идентификатора и количества разделов он уравновешивает количество строк в каждом разделе.

...