У меня возникла проблема с параллельным скребком для документов, который я сделал.
Я использую библиотеку Ray и декоратор @ray.remote
, который отлично работает. Проблемы возникают при получении результатов. Это фрагмент моего кода:
ray.init(num_cpus=n_workers)
futures = [worker.remote(x) for x in path_and_dest]
# get results
for doc in futures:
try:
ray.get(doc, timeout = timeout)
pbar1.update(1) # add 1 to success bar
except RayTimeoutError:
pbar2.update(1) # add 1 to failure bar
except Exception as error:
print("function raised %s" % error)
print(error.traceback)
Функция worker
очищает документ по заданному пути и сохраняет выходные данные в пункт назначения (заданный в качестве аргументов в переменной path_and_dest
). Функция, которую я пытаюсь реализовать, убьет процесс, если он займет больше времени, чем набор timeout
(в секундах) для очистки документа.
Проблема:
В настоящее время все процессы зависают, если один из них "терпит неудачу" из-за обработки try-кроме и того, как я oop поверх futures
-объекта. Например, если я использую 8 ядер и все 8 процессов превышают timeout
, все они должны выйти из строя одновременно, сейчас для всех них требуется 8 * timeout
секунд.