Распараллеливание с Рэем и получение результатов с тайм-аутом - PullRequest
0 голосов
/ 15 января 2020

У меня возникла проблема с параллельным скребком для документов, который я сделал.

Я использую библиотеку Ray и декоратор @ray.remote, который отлично работает. Проблемы возникают при получении результатов. Это фрагмент моего кода:

ray.init(num_cpus=n_workers)
futures = [worker.remote(x) for x in path_and_dest]

# get results
for doc in futures:
    try:
        ray.get(doc, timeout = timeout)
        pbar1.update(1) # add 1 to success bar
    except RayTimeoutError:
        pbar2.update(1) # add 1 to failure bar
    except Exception as error:
        print("function raised %s" % error)
        print(error.traceback)

Функция worker очищает документ по заданному пути и сохраняет выходные данные в пункт назначения (заданный в качестве аргументов в переменной path_and_dest). Функция, которую я пытаюсь реализовать, убьет процесс, если он займет больше времени, чем набор timeout (в секундах) для очистки документа.

Проблема:

В настоящее время все процессы зависают, если один из них "терпит неудачу" из-за обработки try-кроме и того, как я oop поверх futures -объекта. Например, если я использую 8 ядер и все 8 процессов превышают timeout, все они должны выйти из строя одновременно, сейчас для всех них требуется 8 * timeout секунд.

1 Ответ

0 голосов
/ 11 февраля 2020

Используйте ray.wait() api с while l oop. Подробности в ссылке здесь: https://ray.readthedocs.io/en/latest/package-ref.html#ray .wait

Основная идея - запустить некоторое время l oop в течение определенного времени и увеличить ошибку тайм-аута для фьючерсов wait .

Psuedocode

curr = 0
timeout_cnt = 10
while curr < timeout_cnt:
    ready, wait = ray.wait(futures)
    # Do some ready ID processing here.

    curr += 1
    time.sleep(0.1)
    if len(wait) == 0:
        break
...