Я хотел бы конвертировать файлы из одного формата в другой, используя 32 ядра, которые у меня есть.У меня есть этот кусок кода, который должен делать то, что я хочу.Т.е. перебрать все файлы и преобразовать XML-подобные файлы в паркет.Порядок не имеет значения.Поэтому, когда задание заканчивается, оно должно просто перейти к следующему файлу и преобразовать его, не дожидаясь других заданий.Общее количество заданий не должно превышать максимальное значение, указанное в procs
.Это начинает очень многообещающе, но после первой итерации процесс в основном останавливается.Я тоже пробовал Pool.map
, Pool.apply_async
.Я использую контекст pyspark в блокноте jupyter, но не уверен, важно ли это здесь.
import pandas as pd
from pyspark import SQLContext
from pyteomics import mzxml
from glob import glob
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
from multiprocessing import Process, Queue, Pool
from random import shuffle
def mzxml_to_pandas_df(filename):
slices = []
file = mzxml.MzXML(filename)
# print('Reading:', filename)
while True:
try:
slices.append(pd.DataFrame(file.next()))
except:
break
return pd.concat(slices)
def fix_column_names_for_spark(df):
df.rename(columns={'m/z array': 'mz_array',
'intensity array': 'intensity_array'}, inplace=True)
def mzxml_to_parquet(filename):
new_name = filename.replace('.mzXML', '.parquet')
print(new_name, end=" ")
if os.path.isdir(new_name):
print('already exists.')
return None
df = mzxml_to_pandas_df(filename)
fix_column_names_for_spark(df)
sqlContext.createDataFrame(df).write.save(new_name)
print('done')
mzxml_files = glob('./data/*mzXML') # list of filenames
procs = 10
pool = Pool(procs)
shuffle(mzxml_files) # Does not help.
pool.map_async(mzxml_to_parquet, mzxml_files)
pool.close()
pool.join()
Почему этот код останавливается в какой-то момент, не продолжая работу с остальными файлами?Меня интересует, как правильно использовать многопроцессорность, а также более элегантные способы выполнения задачи.Спасибо!
Теперь я смог воспроизвести эти ошибки.У меня действительно возникает проблема с искрой:
ОШИБКА: py4j.java_gateway: Произошла ошибка при попытке подключения к серверу Java (127.0.0.1:34967) Traceback (последний вызов был последним):
Файл "/home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", строка 1145,в send_command self.socket.sendall (command.encode ("utf-8")) BrokenPipeError: [Errno 32] Broken pipe
Во время обработки вышеуказанного исключения произошло другое исключение:
Трассировка (последний вызов был последним): файл "/home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", строка 985, в ответе send_command = файл connection.send_command (команда)" /home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip / py4j / java_gateway.py ", строка 1149, в send_command" Ошибка при отправке ", e, proto.ERROR_ON_SEND) py4j.protocol.Py4JNetworkError: Ошибка при отправке
Во время обработки вышеуказанного исключенияn, произошло другое исключение:
Traceback (последний вызов был последним): файл "/home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip / py4j / java_gateway.py ", строка 929, в _get_connection connection = self.deque.pop () IndexError: всплывающее сообщение из пустой очереди
Во время обработки вышеуказанного исключения другое исключениеПроизошло:
Трассировка (последний последний вызов): Файл "/home/.../software/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip / py4j / java_gateway.py ", строка 1067, при запуске self.socket.connect ((self.address, self.port)) ConnectionRefusedError: [Errno 111] Соединение отклонено
Делает ли этозначит многопроцессорность не должна использоваться вместе с Spark?