Dask игнорирует задачи, потому что они имеют одинаковое «имя», даже с разными входами - PullRequest
0 голосов
/ 03 июля 2019

Чтобы уменьшить объем кода, который мне нужно написать, у меня есть родительский класс, отправляющий функции клиентам dask, а дети реализуют детали функции, переопределяя метод. Кажется, Dask перезаписывает / удаляет задачи, потому что они имеют одинаковое «имя». Я не уверен, как заставить dask кратко назвать каждую задачу, не перезаписывая предыдущие.

мой общий график заданий на поставку выглядит следующим образом:

input1 ----> task1 ----> task2 ---->\
                                    ||=>combine---->||=> combine
input2 ----> task1 ----> task2 ---->/               /
                                                   /
input3 ----> task1 ----> task2 ------------------>/

но наблюдаемое поведение:

input1 ----> task1 ----> task2 ---->\
                                    ||=>combine---->
input1 ----> task1 ----> task2 ---->/               

(обратите внимание на изменение ввода !!)

Один из рабочих, но очень неидеальных подходов заключается в том, чтобы дочерние классы реализовывали метод key_generator () на основе входных данных проблемы. что-то вроде

class ChildClass(ParentClass):
    METHOD_NAME='child1'
    def __init__(self):
        super().__init__()

    def _generate_key(self,input):
        return '{}{}'.format(self.METHOD_NAME,input)

Однако на панели инструментов dask это приводит к тому, что каждый уникальный вызов метода + параметра обрабатывается как отдельная функция. Это работает, но становится трудно отследить, сколько выполнений осталось на процесс.

Вот конкретный пример этого в действии:

######################### class defs ##########################

class Superclass:

    def __call__(self, future, dask_client, inputs):
        key = self._get_process_key()
        return dask_client.submit(self._execute,future,inputs,key=key)

    def _execute(self,future,*args):
        return None

    def _get_process_key(self):
        return None

class Process1(Superclass):

    def __init__(self):
        super().__init__()

    def _execute(self,future,inputs):
        future+=1
        return future

    def _get_process_key(self):
        return 'process1'

class Process2(Superclass):

    def __init__(self):
        super().__init__()

    def _execute(self,future,inputs):
        future*=2
        return future

    def _get_process_key(self):
        return 'process2'


class CombineProcesses(Superclass):

    def __init__(self):
        super().__init__()

    def _execute(self,future1,future2):
        combined = future1 + future2
        return combined

    def _get_process_key(self):
        return 'combine'

###################### instantiate ######################

import dask.distributed as dist
client = dist.Client()
p1     = Process1()
p2     = Process2()
combine= CombineProcesses()

###################### execute ##########################
inputs  = [0,50]
futures = []
for inp in inputs:
    f1 = p1(inp,client,inp)
    f2 = p2(f1,client,inp)
    futures.append(f2)
result = combine(futures[0],client,futures[1])
result = result.result()

Ожидаемый результат:

(0  + 1 )*2 -> 2  -\
                    104
(50 + 1 )*2 ->102 -/

Фактический результат:

(0 + 1 )*2 -> 2 -\
                  4
(0 + 1 )*2 -> 2 -/
...