Я новичок в Pyspark и немного не понимаю, как думать о проблеме.
У меня большой фрейм данных, и я хотел бы отфильтровать каждое подмножество этого фрейма данных на основе двух столбцов и запустить это с помощью того же алгоритма.
Вот пример того, как я запускаю его (крайне неэффективно) сейчас:
for letter in ['a', 'b', 'c']:
for number in [1, 2, 3]
filtered_DF_1, filtered_DF_2 = filter_func(DF_1, DF_2, letter, number)
process_function(filtered_DF_1, filtered_DF_2)
Basi c функция фильтрации:
def filter_func(DF_1, DF_2, letter, number):
DF_1 = DF_1.filter(
(F.col("Letter") == letter) &
(F.col('Number') == number)
)
DF_2 = DF_2.filter(
(F.col("Letter") == letter) &
(F.col('Number') == number)
)
return DF_1, DF_2
Поскольку это Pyspark, я хотел бы распараллелить его, поскольку каждая итерация функции может выполняться независимо.
Нужно ли мне делать какое-то сопоставление, чтобы получить все мои подмножества данных? И затем мне нужно сделать что-нибудь с process_function
, чтобы он был доступен для всех узлов, чтобы он запускался и возвращал ответ?
Как лучше всего это сделать? РЕДАКТИРОВАТЬ:
process_function
берет отфильтрованный набор данных и запускает его через около 7 различных функций, которые уже написаны в 300 строках pyspark -> конечная цель - вернуть список временных меток, которые забронированы основанный на кучке сложных логарифмов c.
Я думаю, что мой план состоит в том, чтобы создать словарь букв -> [число], затем разложить этот список, чтобы получить каждую перестановку и создать из него набор данных. Затем сопоставьте это, и, надеюсь, я смогу создать udf для моей функции process_function.