Задача оптимизации PySpark UDF - PullRequest
3 голосов
/ 10 июля 2020

Я пытаюсь оптимизировать приведенный ниже код. Выполнение при запуске с 1000 строками данных занимает около 12 минут. Наш вариант использования требует, чтобы размер данных составлял от 25 до 50 тысяч строк, что сделало бы эту реализацию совершенно невыполнимой. методы spacy.load () и load_glove (), вызываемые на каждой итерации? Есть ли способ подготовить данные load_glove () один раз для каждого рабочего узла, а не один раз для каждой строки данных? Метод load_glove возвращает объект словаря размером до 5 ГБ. Есть ли способ подготовить это на главном узле, а затем передать как параметр в UDF?

Оцените ваши предложения. Заранее спасибо!

Ответы [ 2 ]

3 голосов
/ 11 июля 2020

Да, в текущей реализации весь код загрузки модели будет выполняться каждый раз при запуске вашей функции, что далеко не оптимально. Невозможно передать его напрямую от драйвера к рабочим узлам, но есть аналогичный способ - инициализировать модель на каждом рабочем узле, но только один раз. Для этого вам придется использовать ленивую функцию, которая будет выполняться только тогда, когда потребуется фактический результат - так, на рабочих.

Попробуйте сделать что-то вроде этого:

# Here we are not loading the model at the loading time, only the worker code
# will invoke this routine and gets the spacy object. Which means we are loading
# new spacy models on every executors.
SPACY_MODEL = None
def get_spacy_model():
    global SPACY_MODEL
    if not SPACY_MODEL:
       _model = spacy.load('en', max_length=6000000)
    SPACY_MODEL = _model
    return SPACY_MODEL

@udf(returnType=Types.ArrayType(Types.FloatType()))
def generateVectorRepresentation(text):
  # TODO: move the load function out if posible, and remove unused modules 
  # nlp = spacy.load('en', disable=['parser', 'tagger'])
  nlp = get_spacy_model()
  # your further processing

Я думаю, вы можете попробовать добавить код загрузки перчатки в аналогичную функцию.

Вы можете попробовать прочитать об этом здесь: https://haridas.in/run-spacy-jobs-on-apache-spark.html (это не мой блог, только что нашел эту информацию при попытке проделать то же самое с моделью Spacy).

0 голосов
/ 11 июля 2020

Главное, что делает udf-ы такими медленными, это то, что они не могут быть оптимизированы с помощью Spark (они рассматриваются как черный ящик). Поэтому, чтобы сделать это быстрее, вам нужно вынуть как можно больше и заменить его функциями ванильной искры. Идеально было бы оставить в udf только часть spacy (я не знаком с этим модулем), получить результирующий DF и выполнить остальные преобразования, необходимые с функциями vanilla spark.

Например, load_glove() будет выполняться для каждой строки, как говорит другой ответ. Но если посмотреть на код, его формат выглядит так, как будто его можно превратить в фрейм данных с 301 столбцом. Затем вы можете присоединиться к этому, чтобы получить необходимые значения. (если вы можете заставить другой DF иметь word.text в качестве ключей, это немного сложно сказать без данных, но теоретически это кажется возможным).

...