В некоторых классах конвейеров данных полезно дождаться завершения какого-либо внешнего процесса, скажем, наблюдая, записан ли файл.
Реализация этого наивно в dask вызывает долгосрочную задачу, которая блокирует работника на все время.
def wait_for_file(filename='some_filename', max_wait_time=600):
start_time = time.time()
while True:
if time.time() - start_time > max_wait_time:
raise Exception('Timeout')
if exists(filename):
return filename
time.sleep(0.1)
file_exists = delayed(wait_for_file)()
res = delayed(process_file)(file_exists)
Как сделать так, чтобы этот код не блокировал работника