Как узнать количество выполненных или оставшихся заданий map_async? - PullRequest
2 голосов
/ 05 октября 2011

Я использую средство параллельной обработки iPython для работы с большой картой. В ожидании завершения операции карты я хотел бы показать пользователю, сколько заданий завершено, сколько запущено и сколько осталось. Как я могу найти эту информацию?

Вот что я делаю. Я создаю профиль, который использует локальный движок и запускаю двух рабочих. В оболочке:

$ ipython profile create --parallel --profile=local
$ ipcluster start --n=2 --profile=local

Вот клиентский скрипт Python:

#!/usr/bin/env python

def meat(i):
    import numpy as np
    import time
    import sys
    seconds = np.random.randint(2, 15)
    time.sleep(seconds)
    return seconds

import time
from IPython.parallel import Client

c = Client(profile='local')
dview = c[:]

ar = dview.map_async(meat, range(4))
elapsed = 0
while True:
    print 'After %d s: %d running' % (elapsed, len(c.outstanding))
    if ar.ready():
        break
    time.sleep(1)
    elapsed += 1
print ar.get()

Пример вывода из скрипта:

After 0 s: 2 running
After 1 s: 2 running
After 2 s: 2 running
After 3 s: 2 running
After 4 s: 2 running
After 5 s: 2 running
After 6 s: 2 running
After 7 s: 2 running
After 8 s: 2 running
After 9 s: 2 running
After 10 s: 2 running
After 11 s: 2 running
After 12 s: 2 running
After 13 s: 2 running
After 14 s: 1 running
After 15 s: 1 running
After 16 s: 1 running
After 17 s: 1 running
After 18 s: 1 running
After 19 s: 1 running
After 20 s: 1 running
After 21 s: 1 running
After 22 s: 1 running
After 23 s: 1 running
[9, 14, 10, 3]

Как видите, я могу получить количество запущенных в данный момент заданий, но не количество выполненных (или оставшихся) заданий. Как я могу узнать, сколько работ map_async завершено?

1 Ответ

3 голосов
/ 06 октября 2011

AsyncResult имеет атрибут msg_ids. Выдающиеся задания - это пересечение этого с rc.outstanding, а выполненные задания - это разница:

msgset = set(ar.msg_ids)
completed = msgset.difference(rc.outstanding)
pending = msgset.intersection(rc.outstanding)
...