У меня есть поток, похожий на:
all_feed_ids
|> Flow.from_enumerable
|> Flow.map(&download_feed)
|> Flow.flat_map(&extract_feed) # Creates many items per feed
# |> Flow.map_state(&bulk_insert_items)
|> Flow.map(&download_item)
|> Flow.flat_map(&extract_item) # Creates many sub-items per item
# |> Flow.map(&bulk_insert_subitem)
В этой теме предлагается использовать Flow.map_state
, но он больше не существует.
КакВ результате я использую следующее вместо каждого Flow.map_state:
|> Flow.partition(window: Flow.Window.count(@bulk_insert_size), stages: 1)
|> Flow.reduce(fn -> [] end, fn item, list ->
[item | list]
end)
|> Flow.on_trigger(&bulk_insert_items)
|> Flow.partition()
Это разумный подход или есть другая функция или предпочтительный способ сделать это с Flow (или это может быть дажене подходящий вариант использования для потока)?