Python multiprocessing.Pool.map_async () не выполняет всю работу с SparkContext - PullRequest
0 голосов
/ 22 января 2019

Я хотел бы конвертировать файлы из одного формата в другой, используя 32 ядра, которые у меня есть.У меня есть этот кусок кода, который должен делать то, что я хочу.Т.е. перебрать все файлы и преобразовать XML-подобные файлы в паркет.Порядок не имеет значения.Поэтому, когда задание заканчивается, оно должно просто перейти к следующему файлу и преобразовать его, не дожидаясь других заданий.Общее количество заданий не должно превышать максимальное значение, указанное в procs.Это начинает очень многообещающе, но после первой итерации процесс в основном останавливается.Я тоже пробовал Pool.map, Pool.apply_async.Я использую контекст pyspark в блокноте jupyter, но не уверен, важно ли это здесь.

import pandas as pd
from pyspark import SQLContext
from pyteomics import mzxml
from glob import glob

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
from multiprocessing import Process, Queue, Pool
from random import shuffle

def mzxml_to_pandas_df(filename):
    slices = []
    file = mzxml.MzXML(filename)
    # print('Reading:', filename)
    while True:
        try:
            slices.append(pd.DataFrame(file.next()))
        except:
            break
    return pd.concat(slices)

def fix_column_names_for_spark(df):
    df.rename(columns={'m/z array': 'mz_array', 
                       'intensity array': 'intensity_array'}, inplace=True)

def mzxml_to_parquet(filename):
    new_name = filename.replace('.mzXML', '.parquet')
    print(new_name, end=" ")
    if os.path.isdir(new_name):
        print('already exists.')
        return None
    df = mzxml_to_pandas_df(filename)
    fix_column_names_for_spark(df)
    sqlContext.createDataFrame(df).write.save(new_name)
    print('done')

mzxml_files = glob('./data/*mzXML')  # list of filenames
procs = 10
pool = Pool(procs)
shuffle(mzxml_files)  # Does not help. 
pool.map_async(mzxml_to_parquet, mzxml_files)
pool.close()
pool.join()

Почему этот код останавливается в какой-то момент, не продолжая работу с остальными файлами?Меня интересует, как правильно использовать многопроцессорность, а также более элегантные способы выполнения задачи.Спасибо!

Теперь я смог воспроизвести эти ошибки.У меня действительно возникает проблема с искрой:

ОШИБКА: py4j.java_gateway: Произошла ошибка при попытке подключения к серверу Java (127.0.0.1:34967) Traceback (последний вызов был последним):
Файл "/home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", строка 1145,в send_command self.socket.sendall (command.encode ("utf-8")) BrokenPipeError: [Errno 32] Broken pipe

Во время обработки вышеуказанного исключения произошло другое исключение:

Трассировка (последний вызов был последним): файл "/home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", строка 985, в ответе send_command = файл connection.send_command (команда)" /home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip / py4j / java_gateway.py ", строка 1149, в send_command" Ошибка при отправке ", e, proto.ERROR_ON_SEND) py4j.protocol.Py4JNetworkError: Ошибка при отправке

Во время обработки вышеуказанного исключенияn, произошло другое исключение:

Traceback (последний вызов был последним): файл "/home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip / py4j / java_gateway.py ", строка 929, в _get_connection connection = self.deque.pop () IndexError: всплывающее сообщение из пустой очереди

Во время обработки вышеуказанного исключения другое исключениеПроизошло:

Трассировка (последний последний вызов): Файл "/home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip / py4j / java_gateway.py ", строка 1067, при запуске self.socket.connect ((self.address, self.port)) ConnectionRefusedError: [Errno 111] Соединение отклонено

Делает ли этозначит многопроцессорность не должна использоваться вместе с Spark?

...