У меня есть цель написать более сложную функцию для разделителя. В частности, мой разделитель берет user_id и возвращает номер раздела на основе расчета другого столбца для этого пользователя. Например:
records = np.random.randint(-100, 100, 10**3)
table = pd.DataFrame.from_dict(dict(Counter(records)), orient='index').reset_index()
table.columns = ['user_id', 'num_records']
table_df = ss.createDataFrame(table)
table_df.show()
+-------+-----------+
|user_id|num_records|
+-------+-----------+
| -43| 6|
| 7| 5|
| 97| 5|
| -2| 4|
| 4| 3|
+-------+-----------+
Теперь мой разделитель принимает в качестве входных данных user_id и возвращает некоторую функцию num_records в качестве partition_number.
def simple_partitioner(user_id):
partition = table_df.where(F.col('user_id') == user_id).select(F.col('num_records'))
return hash(partition.rdd.flatMap(list).first())
Если я запускаю эту функцию, я получаю PicklingError:
rdd = table_df.rdd.partitionBy(num_partitions, simple_partitioner)
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o908.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
Что, я думаю, происходит потому, что table_df
внутри функции разбиения. Я видел аналогичный вопрос , но он касался соединения между фреймами данных, а не разделения.
Как я могу успешно иметь фрейм данных внутри функции раздела? Или, альтернативно, выполните действие, описанное выше.