У меня есть 2 функции: find_components и processing_partition_component
import random
import dask.bag as db
def find_components(partition):
# it will return a list of components
return [x for x in range(1, random.randint(1,10))]
def processing_partition_component(part_comp):
print("processing %s" % part_comp)
partitions=['2','3','4']
Я хочу вычислить find_components () для раздела, а затем получить выходные данные каждого раздела для генерации задач для processing_partition_component ().И вычисление не должно ждать завершения всех find_coponents ().Другими словами, processing_partition_component () должен вызываться сразу после завершения одной из processing_partition. Я пробовал это, но это не то, что я хочу:
db.from_sequence(partitions, partition_size=1).map(find_components).map(processing_partition_component).compute()
# Output:
processing [1, 2, 3, 4, 5]
processing [1, 2]
processing [1, 2, 3, 4, 5, 6, 7, 8, 9]
Вы можете увидеть, что processing_partition_component () принимаетвесь вывод функции find_components (), например: [1, 2, 3, 4, 5] как положено.Что я хочу, так это то, что задача должна разветвляться после find_components (), и каждый processing_partition_component () должен принимать только 1 элемент, например 1, 2, 3, 4 или 5. Ожидаемый вывод -
processing 1
processing 2
processing 3
....
processing 1 # from another output of find_components
...
Еслиэто многопоточный, порядок распечатки будет перепутан, так что Обработка 1 может быть распечатана 3 раза рядом друг с другом
Я не знаю, как это сделать, используя dask.bag и dask.delayed,Я использую последний dask с python3
Спасибо,