Управление асинхронными действиями, которые могут конфликтовать в F # - PullRequest
0 голосов
/ 01 ноября 2018

У меня много действий (Async<T> list) для выполнения в F #. Я могу выполнить большинство этих действий параллельно, но некоторые могут конфликтовать из-за блокировок файлов и т. Д.

Для каждого действия я могу сгенерировать «ключ» (int), который определяет, могут ли действия конфликтовать:

  • Если действие a имеет ключ i, а действие b имеет ключ j и i = j, то a и b могут конфликтовать. Они должны быть выполнены поочередно.
  • Если действие a имеет ключ i, а действие b имеет ключи j и i <> j, то a и b никогда не будут конфликтовать. Они могут выполняться параллельно.

Я хотел бы выполнить свои действия (int * Async<T>) list эффективно и без конфликтов.

Я полагаю, что процесс будет выглядеть примерно так:

  • Группировать все действия по ключу
  • Последовательное объединение каждой группы в одну Async
  • Выполнить каждую цепь параллельно

Как я могу реализовать это в F #?

Как обычно решаются эти проблемы?


Моя попытка полностью последовательной реализации:

let wrapTasks<'T> (tasks : (int * Async<'T>) list) : Async<'T list> = async {
  return
    tasks 
    |> Seq.map (fun (k, t) -> t |> Async.RunSynchronously)
    |> Seq.toList
}

Ответы [ 2 ]

0 голосов
/ 01 ноября 2018

С помощью вспомогательной функции, принимающей обещание для значения x и одно для набора значений acc:

module Async =
    let sequence x acc = async {
        let! x = x
        let! y = acc
        return x :: y
    }

мы можем асинхронно сгруппировать tasks по их «идентификатору блокировки», немного очистить результирующий список и затем sequence каждую группу в один async, который «содержит» список результатов своей группы , Этот список затем обрабатывается параллельно. Как только ts : 'b list [] будет доступен, мы сгладим его:

let wrapTasks tasks = async {
    let! ts =
        tasks
        |> List.groupBy fst
        |> List.map (snd >> List.map snd)
        |> List.map (fun asyncs -> List.foldBack Async.sequence asyncs (async { return [] }))
        |> Async.Parallel
    return ts |> List.ofArray |> List.collect id
}

Это можно проверить, например,

List.init 50 (fun i -> i % 5, async {
    let now = System.DateTime.UtcNow.Ticks
    do! Async.Sleep 10
    return i, now })
|> wrapTasks
|> Async.RunSynchronously
|> List.groupBy snd
|> List.map (fun (t, rs) -> t, rs |> List.map fst)
|> List.sort

Изменяя делитель, мы можем регулировать уровень параллелизма и убеждать себя, что функция работает, как и ожидалось: -)

  [(636766393199727614L, [0; 1; 2; 3; 4]);
   (636766393199962986L, [5; 6; 7; 8; 9]);
   (636766393200068008L, [10; 11; 12; 13; 14]);
   (636766393200278385L, [15; 16; 17; 18; 19]);
   (636766393200382690L, [20; 21; 22; 23; 24]);
   (636766393200597692L, [25; 26; 27; 28; 29]);
   (636766393200703235L, [30; 31; 32; 33; 34]);
   (636766393200918241L, [35; 36; 37; 38; 39]);
   (636766393201027938L, [40; 41; 42; 43; 44]);
   (636766393201133307L, [45; 46; 47; 48; 49])]

Полное раскрытие: мне пришлось выполнить тест несколько раз, чтобы получить этот хороший результат. Обычно цифры немного сбиваются.

0 голосов
/ 01 ноября 2018

Это возможное решение:

let wrapTasks (tasks : (int * Async<'T>) list) =
    tasks
    |> List.groupBy fst 
    |> Seq.map (fun (k, ts) -> async {
        for (i, t) in ts do
            let! r = t
            ()
    })
    |> Async.Parallel
    |> Async.RunSynchronously
...