Почему DirectView.execute () никогда не готов () в ipyparallel и как я могу удалить? - PullRequest
0 голосов
/ 29 ноября 2018

У меня есть задание ipyparallel, которое занимает слишком много памяти из-за больших результатов, поэтому я изменяю его для использования Client.purge_local_results () и purge_hub_results().Если я использую dview.execute() в качестве рекомендованного , я получаю AsyncResult и могу успешно запустить другие блоки кода в представлении с балансировкой нагрузки, но исходный результат никогда не будет ready().

Кроме того, message_ids из исходного результата остаются в Client.outstanding, что препятствует успешному выполнению команд purge().Использование block=True и block=False приводит к неожиданному поведению.

Ниже приведен пример кода.Это использует Python версии 3.6.7, ipyparallel версии 6.2.2 на CentOS7.

import sys
import time
import ipyparallel

print("Python version {}, ipyparallel version {}".format(sys.version, ipyparallel.__version__))

cluster_profile = sys.argv[1]
try:
    rc = ipyparallel.Client(profile=cluster_profile)
except (ipyparallel.error.TimeoutError, OSError):
    print("The cluster request for '{}' timed out. Is it defined and running?".format(cluster_profile))
    exit(2)

dview = rc[:]
lview = rc.load_balanced_view()

def trivial_fn(x=None):
    import os
    pid = os.getpid()
    filename = '/tmp/{}.pid'.format(pid)
    with open(filename, 'a'):
        os.utime(filename)
    return str(pid) + str(datetime.datetime.now())

if sys.argv[2] == 'block':

    print("Running with block=True.....  (Use Ctrl-C if this hangs).")
    async_exec_withblock = dview.execute('trivial_fn()', block=True)
    # Note, this does hang until pressing Ctrl-C

    print("Blocking OK, got {}. ".format(async_exec_withblock))


    # Note, we never get to the following line.  A second Ctrl-C kills us
    print("Blocking Done, got {}\n:{} ".format(async_exec_withblock, list(async_exec_withblock) ) )
    print("Finished.")
    exit(0)
else:

    print("Running with block=False....  (Use Ctrl-C if this hangs)")
    async_exec_noblock = dview.execute('import datetime', block=False)
    print("Nonblocking request done, got class {}: {}.  Running a map_async() on the load balanced view.....".format(
        async_exec_noblock.__class__, str(async_exec_noblock)
    ))

    mapped_trivial = lview.map_async(trivial_fn, list(range(4)))
    # Note, this successfully prints expected results from the load-balanced view
    print('Load balanced view returned {}'.format(' '.join(mapped_trivial)))


    print('Now waiting for ready() from old non-blocking request....')
    while not async_exec_noblock.ready():
        time.sleep(0.5)
        print('Still waiting.  Current Client.outstanding size is {}....'.format(len(rc.outstanding)))
        # Note, this repeats forever

    # Note, we never get to the following line...
    print("Non-blocking OK, got {}. ".format(list(async_exec_noblock)))
    print("Finished.")
    exit(0)
...