PicklingError в PySpark: написание пользовательской функции разделения для фрейма данных - PullRequest
0 голосов
/ 15 февраля 2020

У меня есть цель написать более сложную функцию для разделителя. В частности, мой разделитель берет user_id и возвращает номер раздела на основе расчета другого столбца для этого пользователя. Например:

records = np.random.randint(-100, 100, 10**3)
table = pd.DataFrame.from_dict(dict(Counter(records)), orient='index').reset_index()
table.columns = ['user_id', 'num_records']

table_df = ss.createDataFrame(table)
table_df.show()

+-------+-----------+
|user_id|num_records|
+-------+-----------+
|    -43|          6|
|      7|          5|
|     97|          5|
|     -2|          4|
|      4|          3|
+-------+-----------+

Теперь мой разделитель принимает в качестве входных данных user_id и возвращает некоторую функцию num_records в качестве partition_number.

def simple_partitioner(user_id):
    partition = table_df.where(F.col('user_id') == user_id).select(F.col('num_records'))
    return hash(partition.rdd.flatMap(list).first())

Если я запускаю эту функцию, я получаю PicklingError:

rdd = table_df.rdd.partitionBy(num_partitions, simple_partitioner)

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o908.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist

Что, я думаю, происходит потому, что table_df внутри функции разбиения. Я видел аналогичный вопрос , но он касался соединения между фреймами данных, а не разделения.

Как я могу успешно иметь фрейм данных внутри функции раздела? Или, альтернативно, выполните действие, описанное выше.

...