С помощью вспомогательной функции, принимающей обещание для значения x
и одно для набора значений acc
:
module Async =
let sequence x acc = async {
let! x = x
let! y = acc
return x :: y
}
мы можем асинхронно сгруппировать tasks
по их «идентификатору блокировки», немного очистить результирующий список и затем sequence
каждую группу в один async
, который «содержит» список результатов своей группы , Этот список затем обрабатывается параллельно. Как только ts : 'b list []
будет доступен, мы сгладим его:
let wrapTasks tasks = async {
let! ts =
tasks
|> List.groupBy fst
|> List.map (snd >> List.map snd)
|> List.map (fun asyncs -> List.foldBack Async.sequence asyncs (async { return [] }))
|> Async.Parallel
return ts |> List.ofArray |> List.collect id
}
Это можно проверить, например,
List.init 50 (fun i -> i % 5, async {
let now = System.DateTime.UtcNow.Ticks
do! Async.Sleep 10
return i, now })
|> wrapTasks
|> Async.RunSynchronously
|> List.groupBy snd
|> List.map (fun (t, rs) -> t, rs |> List.map fst)
|> List.sort
Изменяя делитель, мы можем регулировать уровень параллелизма и убеждать себя, что функция работает, как и ожидалось: -)
[(636766393199727614L, [0; 1; 2; 3; 4]);
(636766393199962986L, [5; 6; 7; 8; 9]);
(636766393200068008L, [10; 11; 12; 13; 14]);
(636766393200278385L, [15; 16; 17; 18; 19]);
(636766393200382690L, [20; 21; 22; 23; 24]);
(636766393200597692L, [25; 26; 27; 28; 29]);
(636766393200703235L, [30; 31; 32; 33; 34]);
(636766393200918241L, [35; 36; 37; 38; 39]);
(636766393201027938L, [40; 41; 42; 43; 44]);
(636766393201133307L, [45; 46; 47; 48; 49])]
Полное раскрытие: мне пришлось выполнить тест несколько раз, чтобы получить этот хороший результат. Обычно цифры немного сбиваются.