Я запускаю скрипт Python на суперкомпьютерном кластере Sun Grid Engine, который считывает список идентификаторов файлов, отправляет каждый из них рабочему процессу для анализа и записывает один вывод для каждого входного файла на диск.
Проблема в том, что я получаю IOError (110, «Соединение истекло») где-то внутри рабочей функции, и я не уверен почему.Я получал эту ошибку в прошлом, когда выполнял сетевые запросы, которые были сильно задержаны, но в этом случае работник только пытается прочитать данные с диска.
Мой вопрос: что может привести к ошибке тайм-аута соединения при чтении с диска, и как можно устранить эту ошибку?Буду очень признателен за любую помощь, которую могут предложить другие.
Полный сценарий (ошибка IOError возникает в minhash_text()
):
from datasketch import MinHash
from multiprocessing import Pool
from collections import defaultdict
from nltk import ngrams
import json
import sys
import codecs
import config
cores = 24
window_len = 12
step = 4
worker_files = 50
permutations = 256
hashband_len = 4
def minhash_text(args):
'''Return a list of hashband strings for an input doc'''
try:
file_id, path = args
with codecs.open(path, 'r', 'utf8') as f:
f = f.read()
all_hashbands = []
for window_idx, window in enumerate(ngrams(f.split(), window_len)):
window_hashbands = []
if window_idx % step != 0:
continue
minhash = MinHash(num_perm=permutations, seed=1)
for ngram in set(ngrams(' '.join(window), 3)):
minhash.update( ''.join(ngram).encode('utf8') )
hashband_vals = []
for i in minhash.hashvalues:
hashband_vals.append(i)
if len(hashband_vals) == hashband_len:
window_hashbands.append( '.'.join([str(j) for j in hashband_vals]) )
hashband_vals = []
all_hashbands.append(window_hashbands)
return {'file_id': file_id, 'hashbands': all_hashbands}
except Exception as exc:
print(' ! error occurred while processing', file_id, exc)
return {'file_id': file_id, 'hashbands': []}
if __name__ == '__main__':
file_ids = json.load(open('file_ids.json'))
file_id_path_tuples = [(file_id, path) for file_id, path in file_ids.items()]
worker_id = int(sys.argv[1])
worker_ids = list(ngrams(file_id_path_tuples, worker_files))[worker_id]
hashband_to_ids = defaultdict(list)
pool = Pool(cores)
for idx, result in enumerate(pool.imap(minhash_text, worker_ids)):
print(' * processed', idx, 'results')
file_id = result['file_id']
hashbands = result['hashbands']
for window_idx, window_hashbands in enumerate(hashbands):
for hashband in window_hashbands:
hashband_to_ids[hashband].append(file_id + '.' + str(window_idx))
with open(config.out_dir + 'minhashes-' + str(worker_id) + '.json', 'w') as out:
json.dump(dict(hashband_to_ids), out)