У меня есть задание ipyparallel
, которое занимает слишком много памяти из-за больших результатов, поэтому я изменяю его для использования Client.purge_local_results () и purge_hub_results()
.Если я использую dview.execute()
в качестве рекомендованного , я получаю AsyncResult
и могу успешно запустить другие блоки кода в представлении с балансировкой нагрузки, но исходный результат никогда не будет ready()
.
Кроме того, message_ids из исходного результата остаются в Client.outstanding
, что препятствует успешному выполнению команд purge()
.Использование block=True
и block=False
приводит к неожиданному поведению.
Ниже приведен пример кода.Это использует Python версии 3.6.7, ipyparallel версии 6.2.2 на CentOS7.
import sys
import time
import ipyparallel
print("Python version {}, ipyparallel version {}".format(sys.version, ipyparallel.__version__))
cluster_profile = sys.argv[1]
try:
rc = ipyparallel.Client(profile=cluster_profile)
except (ipyparallel.error.TimeoutError, OSError):
print("The cluster request for '{}' timed out. Is it defined and running?".format(cluster_profile))
exit(2)
dview = rc[:]
lview = rc.load_balanced_view()
def trivial_fn(x=None):
import os
pid = os.getpid()
filename = '/tmp/{}.pid'.format(pid)
with open(filename, 'a'):
os.utime(filename)
return str(pid) + str(datetime.datetime.now())
if sys.argv[2] == 'block':
print("Running with block=True..... (Use Ctrl-C if this hangs).")
async_exec_withblock = dview.execute('trivial_fn()', block=True)
# Note, this does hang until pressing Ctrl-C
print("Blocking OK, got {}. ".format(async_exec_withblock))
# Note, we never get to the following line. A second Ctrl-C kills us
print("Blocking Done, got {}\n:{} ".format(async_exec_withblock, list(async_exec_withblock) ) )
print("Finished.")
exit(0)
else:
print("Running with block=False.... (Use Ctrl-C if this hangs)")
async_exec_noblock = dview.execute('import datetime', block=False)
print("Nonblocking request done, got class {}: {}. Running a map_async() on the load balanced view.....".format(
async_exec_noblock.__class__, str(async_exec_noblock)
))
mapped_trivial = lview.map_async(trivial_fn, list(range(4)))
# Note, this successfully prints expected results from the load-balanced view
print('Load balanced view returned {}'.format(' '.join(mapped_trivial)))
print('Now waiting for ready() from old non-blocking request....')
while not async_exec_noblock.ready():
time.sleep(0.5)
print('Still waiting. Current Client.outstanding size is {}....'.format(len(rc.outstanding)))
# Note, this repeats forever
# Note, we never get to the following line...
print("Non-blocking OK, got {}. ".format(list(async_exec_noblock)))
print("Finished.")
exit(0)