Как использовать Dask Bag и отложено, чтобы объединить 2 функции отображения? - PullRequest
0 голосов
/ 29 сентября 2018

У меня есть 2 функции: find_components и processing_partition_component

import random
import dask.bag as db

def find_components(partition):
  # it will return a list of components
  return [x for x in range(1, random.randint(1,10))]

def processing_partition_component(part_comp):
  print("processing %s" % part_comp)

partitions=['2','3','4']

Я хочу вычислить find_components () для раздела, а затем получить выходные данные каждого раздела для генерации задач для processing_partition_component ().И вычисление не должно ждать завершения всех find_coponents ().Другими словами, processing_partition_component () должен вызываться сразу после завершения одной из processing_partition. Я пробовал это, но это не то, что я хочу:

db.from_sequence(partitions, partition_size=1).map(find_components).map(processing_partition_component).compute()
# Output:
processing [1, 2, 3, 4, 5]
processing [1, 2]
processing [1, 2, 3, 4, 5, 6, 7, 8, 9]

Вы можете увидеть, что processing_partition_component () принимаетвесь вывод функции find_components (), например: [1, 2, 3, 4, 5] как положено.Что я хочу, так это то, что задача должна разветвляться после find_components (), и каждый processing_partition_component () должен принимать только 1 элемент, например 1, 2, 3, 4 или 5. Ожидаемый вывод -

processing 1
processing 2
processing 3
....
processing 1  # from another output of find_components
...

Еслиэто многопоточный, порядок распечатки будет перепутан, так что Обработка 1 может быть распечатана 3 раза рядом друг с другом

Я не знаю, как это сделать, используя dask.bag и dask.delayed,Я использую последний dask с python3

Спасибо,

1 Ответ

0 голосов
/ 29 сентября 2018

Сумка Dask хорошо справится с генераторами

def f(partition):
    for x in partition:
        yield x + 1

my_bag.map_partitions(f).map(print)

Это добавит один к каждому элементу, а затем распечатает его, прежде чем перейти к следующему элементу

...