Метод для всех строк PySpark DataFrame - PullRequest
0 голосов
/ 26 апреля 2018

У меня проблемы с созданием рабочего udf для моей задачи в PySpark (python = 2.7, pyspark = 1.6)

У меня есть data DataFrame, который выглядит так:

+-----------------+
|         sequence|
+-----------------+
|         idea can|  
|        fuel turn|          
|      found today|  
|           a cell|         
|administration in|           
+-----------------+

И для каждой строки в data Я хотел бы просмотреть информацию в другом DataFrame с именем ggrams (на основе атрибута sequence), вычислить агрегаты и вернуть его в виде нового столбца в data.

У меня такое чувство, что я должен сделать это так:

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def compute_aggregates(x):
    res = ggrams.filter((ggrams.ngram.startswith(x)) \
                              ).groupby("ngram").sum("match_count")
    return res.collect()[0]['sum(match_count)']

aggregate = udf(compute_aggregates, IntegerType())
result = data.withColumn('aggregate', compute_aggregates('sequence'))

Но это возвращает PicklingError.

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1759.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist

1 Ответ

0 голосов
/ 26 апреля 2018

Ошибка выдается, потому что вы не можете получить доступ к другому фрейму данных в udf другого. Самый простой способ исправить это - собрать фрейм данных, к которому вы хотите обратиться.

Другой вариант - перекрестное соединение, но по опыту я могу сказать, что сбор другого фрейма данных происходит быстрее. (У меня нет математики / статистики, чтобы подтвердить это)

так:
1. Соберите фрейм данных, который вы хотите использовать в формате udf.
2. Вызовите этот собранный фрейм данных (который теперь является списком) в вашем udf, теперь вы можете / должны использовать логику python, поскольку вы говорите со списком объектов

Примечание. Постарайтесь ограничить сбор данных до минимума, выберите только те столбцы, которые вам нужны

Обновление:
Если вы имеете дело с очень большим набором, который сделает сбор невозможным, кросс-соединение, скорее всего, теперь сработает (по крайней мере, для меня это не сработало). Проблема заключается в том, что для создания перекрестного произведения двух кадров данных потребуется так много времени, что для соединения с рабочим узлом истечет время ожидания, что приведет к ошибке широковещания. Посмотрите, можете ли вы каким-либо образом ограничить используемые вами столбцы или есть возможность отфильтровать строки, о которых вы точно знаете, что они не будут использоваться.

Если все это не удалось, посмотрите, можете ли вы создать какой-то пакетный подход *, поэтому запустите только первые X строк с собранными данными, если это будет сделано, загрузите следующие X строк. Скорее всего, это будет ужасно медленно, но, по крайней мере, это не истечет время ожидания (я думаю, я не пробовал это лично, так как в моем случае сбор был возможен)

* пакетирует и фрейм данных, на котором вы запускаете udf, и другой фрейм данных, так как вы все еще не можете собрать внутри udf, потому что вы не можете получить доступ к фрейму данных оттуда

...