Как обрабатывать несколько фреймов Pyspark параллельно - PullRequest
0 голосов
/ 29 января 2020

У меня есть фрейм данных pyspark с миллионами записей и сотнями столбцов (пример ниже)

clm1, clm2, clm3
code1,xyz,123
code2,abc,345
code1,qwe,456

Я хочу разделить его на несколько фреймов данных на основе clm1, т.е. отдельный фрейм данных для clm1=code1 отдельный фрейм данных для clm1=code2 и т. д., а затем обработайте их и запишите результат в отдельные файлы. Я хочу выполнить эту операцию параллельно, чтобы ускорить процесс. Я использую следующий код:

S1 = myclass("code1")
S2 = myclass("code2")


t1 = multiprocessing.Process(target=S1.processdata,args=(df,))
t2 = multiprocessing.Process(target=S2.processdata,args=(df,))
t1.start()
t2.start()

t1.join()
t2.join()

, но я получаю ошибку ниже

Method __getstate__([]) does not exist

Если я использую threading.Thread вместо multiprocessing.Process, это работает нормально, но это не так похоже, сократить общее время

1 Ответ

0 голосов
/ 30 января 2020

Об ошибке

Метод GetState ([]) не существует

Это py4j.Py4JException. У вас есть эта ошибка с multiprocessing.Process, потому что этот модуль использует процессы. С другой стороны, threading.Thread использует потоки, которые используют одну и ту же память, поэтому они могут совместно использовать объект dataframe.

Также взгляните на этот вопрос-ответ SO: Многопроцессорная обработка против потоков Python


Общие советы

Я понимаю, что, возможно, вы новичок в мире Spark, и я предлагаю вам свое решение вашей проблемы. Вы спрашивали, как выполнять многопроцессорную обработку, но если у вас есть Spark, возможно, это не лучший метод.

У вас есть Spark- платформа для параллельной обработки , вам не нужно распараллеливать вручную Ваша задача.

Spark разработан для параллельных вычислений в кластере, но он отлично работает на большом отдельном узле. Многопроцессорная библиотека полезна в Python вычислительных задачах, в Spark / Pyspark все вычисления выполняются параллельно в JVM.

В python_code.py

import pyspark.sql.functions as f


# JOB 1
df1 = df.filter(f.col('clm1')=='code1')
... many transformations
df1.write.format('..')..

# JOB 2
df2 = df.filter(f.col('clm1')=='code2')
... many transformations
df2.write.format('..')..

И затем запускайте этот код с помощью spark - отправьте, используя все ваши ядра (* = все ядра)

# Run application locally on all cores
./bin/spark-submit --master local[*] python_code.py

При таком подходе вы используете силу Spark. Задания будут выполняться последовательно, НО у вас будет: загрузка ЦП все время <=> параллельная обработка <=> меньшее время вычислений

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...