Чтобы уменьшить объем кода, который мне нужно написать, у меня есть родительский класс, отправляющий функции клиентам dask, а дети реализуют детали функции, переопределяя метод. Кажется, Dask перезаписывает / удаляет задачи, потому что они имеют одинаковое «имя». Я не уверен, как заставить dask кратко назвать каждую задачу, не перезаписывая предыдущие.
мой общий график заданий на поставку выглядит следующим образом:
input1 ----> task1 ----> task2 ---->\
||=>combine---->||=> combine
input2 ----> task1 ----> task2 ---->/ /
/
input3 ----> task1 ----> task2 ------------------>/
но наблюдаемое поведение:
input1 ----> task1 ----> task2 ---->\
||=>combine---->
input1 ----> task1 ----> task2 ---->/
(обратите внимание на изменение ввода !!)
Один из рабочих, но очень неидеальных подходов заключается в том, чтобы дочерние классы реализовывали метод key_generator () на основе входных данных проблемы. что-то вроде
class ChildClass(ParentClass):
METHOD_NAME='child1'
def __init__(self):
super().__init__()
def _generate_key(self,input):
return '{}{}'.format(self.METHOD_NAME,input)
Однако на панели инструментов dask это приводит к тому, что каждый уникальный вызов метода + параметра обрабатывается как отдельная функция. Это работает, но становится трудно отследить, сколько выполнений осталось на процесс.
Вот конкретный пример этого в действии:
######################### class defs ##########################
class Superclass:
def __call__(self, future, dask_client, inputs):
key = self._get_process_key()
return dask_client.submit(self._execute,future,inputs,key=key)
def _execute(self,future,*args):
return None
def _get_process_key(self):
return None
class Process1(Superclass):
def __init__(self):
super().__init__()
def _execute(self,future,inputs):
future+=1
return future
def _get_process_key(self):
return 'process1'
class Process2(Superclass):
def __init__(self):
super().__init__()
def _execute(self,future,inputs):
future*=2
return future
def _get_process_key(self):
return 'process2'
class CombineProcesses(Superclass):
def __init__(self):
super().__init__()
def _execute(self,future1,future2):
combined = future1 + future2
return combined
def _get_process_key(self):
return 'combine'
###################### instantiate ######################
import dask.distributed as dist
client = dist.Client()
p1 = Process1()
p2 = Process2()
combine= CombineProcesses()
###################### execute ##########################
inputs = [0,50]
futures = []
for inp in inputs:
f1 = p1(inp,client,inp)
f2 = p2(f1,client,inp)
futures.append(f2)
result = combine(futures[0],client,futures[1])
result = result.result()
Ожидаемый результат:
(0 + 1 )*2 -> 2 -\
104
(50 + 1 )*2 ->102 -/
Фактический результат:
(0 + 1 )*2 -> 2 -\
4
(0 + 1 )*2 -> 2 -/