Функция Dask, которая спит, не занимая рабочий? - PullRequest
0 голосов
/ 01 мая 2018

В некоторых классах конвейеров данных полезно дождаться завершения какого-либо внешнего процесса, скажем, наблюдая, записан ли файл.
Реализация этого наивно в dask вызывает долгосрочную задачу, которая блокирует работника на все время.

def wait_for_file(filename='some_filename', max_wait_time=600):
    start_time = time.time()
    while True:
        if time.time() - start_time > max_wait_time:
            raise Exception('Timeout')
        if exists(filename):
            return filename
        time.sleep(0.1)

file_exists = delayed(wait_for_file)()    
res = delayed(process_file)(file_exists)

Как сделать так, чтобы этот код не блокировал работника

1 Ответ

0 голосов
/ 01 мая 2018

Используя secede и rejoin, как указано в http://dask.pydata.org/en/latest/futures.html#submit-tasks-from-tasks, вы можете написать эту функцию ожидания следующим образом

def wait_for_file(filename='some_filename', max_wait_time=600):
    start_time = time.time()
    # detach from the scheduler
    distributed.secede()
    try:
        while True:
            if time.time() - start_time > max_wait_time:
                raise Exception('Timeout')
            if exists(filename):
                # rejoin to the pool of dask executor threads and return
                distributed.rejoin()
                return filename
            time.sleep(0.1)
    finally:
        # in the case where something goes wrong you want to rejoin
        # so that your client knows that this function call failed
        distributed.rejoin()
...