F # Асинхронные действия с конфликтами / упорядочением / мьютексом - PullRequest
0 голосов
/ 01 ноября 2018

F # облегчает определение асинхронных вычислений с помощью компоновщика async. Вы можете написать целую программу и затем передать ее Async.RunSynchronously.

У меня проблема в том, что некоторые async действия не должны выполняться одновременно; они должны быть вынуждены ждать выполнения других действий async. Это что-то вроде мьютекса. Однако я не хочу просто связывать их последовательно, потому что это было бы неэффективно.

Конкретный пример: Скачать кэш

Предположим, я хочу получить некоторые удаленные файлы, используя локальный файловый кеш. В моем приложении я вызываю fetchFile : Async<string> во многих местах, но есть риск, что если я вызову fetchFile по одному и тому же URL в одно и то же время, кэш будет поврежден из-за нескольких записей. Вместо этого команда fetchFile должна иметь такое поведение:

  • Если кеша нет, загрузите файл в кеш, а затем прочитайте содержимое кеша
  • Если в данный момент выполняется запись в кэш, дождитесь окончания записи, а затем прочитайте содержимое
  • Если кеш присутствует и заполнен, просто прочитайте содержимое кеша
  • fetchFile на двух разных URL должны работать параллельно

Я представляю какой-то класс с состоянием DownloadManager, в который можно отправлять и заказывать запросы внутри страны.

Как программисты F # обычно реализуют такую ​​логику с async?


Воображаемое использование:

let dm = new DownloadManager()

let urls = [
  "https://www.google.com"; 
  "https://www.google.com"; 
  "https://www.wikipedia.org"; 
  "https://www.google.com"; 
  "https://www.bing.com"; 
]

let results = 
  urls
  |> Seq.map dm.Download
  |> Async.Parallel
  |> Async.RunSynchronously

Примечание: Ранее я задавал этот вопрос ранее о том, как выполнять async действия в полупараллельном режиме, но теперь я понял, что этот подход трудно составить.

Примечание: мне не нужно беспокоиться о одновременной работе нескольких экземпляров приложения. Блокировка в памяти достаточно.

Ответы [ 3 ]

0 голосов
/ 01 ноября 2018

Я согласен с @AMieres, что процессор почтовых ящиков - хороший способ сделать это. Моя версия кода несколько менее общая - она ​​использует процессор почтовых ящиков непосредственно для этой цели, и поэтому она может быть немного проще.

Наш процессор почтовых ящиков имеет только одно сообщение - вы просите его загрузить URL-адрес, и он возвращает вам асинхронный рабочий процесс, который вы можете ждать, чтобы получить свой результат:

type DownloadMessage = 
  | Download of string * AsyncReplyChannel<Async<string>>

Нам нужна вспомогательная функция для асинхронной загрузки URL:

let asyncDownload url = async {
  let wc = new System.Net.WebClient()
  printfn "Downloading: %s" url
  return! wc.AsyncDownloadString(System.Uri(url)) }

В процессоре почтовых ящиков мы сохраняем изменчивый cache (это нормально, потому что процессор почтовых ящиков обрабатывает сообщения синхронно). Когда мы получаем запрос на загрузку, мы проверяем, есть ли у нас загрузка в кэше - если нет, мы запускаем загрузку как дочерний элемент async и добавляем его в кэш - таким образом, кэш содержит асинхронные рабочие процессы, которые представляют результаты выполнения скачать.

let downloadCache = MailboxProcessor.Start(fun inbox -> async {
  let cache = System.Collections.Generic.Dictionary<_, _>()
  while true do
    let! (Download(url, repl)) = inbox.Receive()
    if not (cache.ContainsKey url) then 
      let! proc = asyncDownload url |> Async.StartChild
      cache.Add(url, proc)
    repl.Reply(cache.[url]) })

Чтобы фактически загрузить с использованием кэша, мы просто отправляем запрос обработчику почтового ящика, а затем ждем результата возвращенного рабочего процесса (который может совместно использоваться несколькими запросами).

let downloadUsingCache url = async {
  let! res = downloadCache.PostAndAsyncReply(fun ch -> Download(url, ch))
  return! res }
0 голосов
/ 01 ноября 2018

Я предлагаю вам упрощенную версию, основанную на ответе @Tomas Petricek.


Предположим, что у нас есть функция загрузки, которая с учетом URL возвращает Async<string>. Это фиктивная версия:

let asyncDownload url = 
    async { 
        let started = System.DateTime.UtcNow.Ticks
        do! Async.Sleep 30
        let finished = System.DateTime.UtcNow.Ticks
        let r = sprintf "Downloaded  %A it took: %dms %s" (started / 10000L) ((finished - started) / 10000L) url
        printfn "%s" r
        return r
    }

Здесь у нас есть несколько простых обобщенных Mailbox вспомогательных функций в их собственном модуле:

module Mailbox =
    let iterA hndl f =
        MailboxProcessor.Start(fun inbox ->
            async {
                while true do
                    try       let!   msg = inbox.Receive()
                              do!  f msg
                    with e -> hndl e
            }
        )
    let callA hndl f = iterA hndl (fun ((replyChannel: AsyncReplyChannel<_>), msg) -> async {
        let! r = f msg
        replyChannel.Reply r
    })
    let call hndl f = callA hndl (fun msg -> async { return f msg } )

Цель этой «библиотеки» - упростить более типичное использование MailboxProcessor. Хотя это выглядит сложным и трудным для понимания, важно то, что делают функции и как их использовать. В частности, мы собираемся использовать Mailbox.call, который возвращает агента почтового ящика, способного вернуть значение. Это подпись:

val call: 
   hndl: exn -> unit ->
   f   : 'a -> 'b    
      -> MailboxProcessor<AsyncReplyChannel<'b> * 'a>

Первый параметр - это обработчик исключений, а второй - функция, которая возвращает значение. Вот как мы определяем нашу downloadManager:

let downloadManager = 
    let dict = new System.Collections.Generic.Dictionary<string, _>()
    Mailbox.call (printfn "%A") (fun url ->         
        if dict.ContainsKey url then dict.[url] else
        let result = asyncDownload url |> Async.StartChild |> Async.RunSynchronously
        dict.Add(url, result)
        result
    )

Наш кеш - это Dictionary. Если URL-адрес отсутствует, мы вызываем asyncDownload и запускаем его как дочерний процесс. Используя Async.StartChild, нам не нужно ждать окончания загрузки, мы просто возвращаем async, который ожидает его завершения.

Для вызова менеджера мы используем downloadManager.PostAndReply

let downloadUrl url = downloadManager.PostAndReply(fun reply -> reply, url)

А вот и тест:

let s = System.DateTime.UtcNow.Ticks
printfn "started %A" (s / 10000L)
let res = 
    List.init 50 (fun i -> i, downloadUrl (string <| i % 5) )
    |> List.groupBy (snd >> Async.RunSynchronously)
    |> List.map (fun (t, ts) -> sprintf "%s - %A" t (ts |> List.map fst ) )

let f = System.DateTime.UtcNow.Ticks
printfn "finish  %A" (f / 10000L)

printfn "elapsed %dms" ((f - s) / 10000L)

res |> printfn "Result: \n%A"

Это производит:

started 63676682503885L
Downloaded  63676682503911L it took: 34ms 1
Downloaded  63676682503912L it took: 33ms 2
Downloaded  63676682503911L it took: 37ms 0
Downloaded  63676682503912L it took: 33ms 3
Downloaded  63676682503912L it took: 33ms 4
finish  63676682503994L
elapsed 109ms
Result: 
["Downloaded  63676682503911L it took: 37ms 0 - [0; 5; 10; 15; 20; 25; 30; 35; 40; 45]";
 "Downloaded  63676682503911L it took: 34ms 1 - [1; 6; 11; 16; 21; 26; 31; 36; 41; 46]";
 "Downloaded  63676682503912L it took: 33ms 2 - [2; 7; 12; 17; 22; 27; 32; 37; 42; 47]";
 "Downloaded  63676682503912L it took: 33ms 3 - [3; 8; 13; 18; 23; 28; 33; 38; 43; 48]";
 "Downloaded  63676682503912L it took: 33ms 4 - [4; 9; 14; 19; 24; 29; 34; 39; 44; 49]"]
0 голосов
/ 01 ноября 2018

UPDATE

Лучше, чем ленивое значение, Async.StartChild, предложенное Петричеком, поэтому я изменил lazyDownload на asyncDownload


Вы можете использовать MailboxProcessor в качестве менеджера загрузки, который обрабатывает кэш. MailboxProcessor - это структура в F #, которая обрабатывает очередь сообщений, не допуская коллизий.

Сначала вам нужен процессор, способный поддерживать состояние:

let stateFull hndl initState =
    MailboxProcessor.Start(fun inbox ->
        let rec loop state : Async<unit> = async {
            try         let! f        = inbox.Receive()
                        let! newState = f state
                        return! loop newState
            with e ->   return! loop (hndl e state)
        }
        loop initState
    )

Первый параметр - это обработчик ошибок, второй - начальное состояние, в данном случае Map<string, Async<string>>. Это наш downloadManager:

let downloadManager = 
    stateFull (fun e s -> printfn "%A" e ; s) (Map.empty : Map<string, _>)

Для вызова MailBox нам нужно использовать PostAndReply:

let applyReplyS f (agent: MailboxProcessor<'a->Async<'a>>) = 
    agent.PostAndReply(fun (reply:AsyncReplyChannel<'r>) -> 
        fun v -> async {
            let st, r = f v
            reply.Reply r
            return st 
        })

Эта функция ожидает функцию папки, которая проверяет кэш и добавляет Async<string>, если ничего не найдено, и возвращает обновленный кеш.

Сначала функция asyncDownload:

let asyncDownload url = 
    async { 
        let started = System.DateTime.UtcNow.Ticks
        do! Async.Sleep 30
        let finished = System.DateTime.UtcNow.Ticks
        let r = sprintf "Downloaded  %A it took: %dms %s" (started / 10000L) ((finished - started) / 10000L) url
        printfn "%s" r
        return r
    }

Просто фиктивная функция, которая возвращает строку и информацию о времени.

Теперь функция папки, которая проверяет кеш:

let folderCache url cache  =
    cache 
    |> Map.tryFind url
    |> Option.map(fun ld -> cache, ld)
    |> Option.defaultWith (fun () -> 
        let ld = asyncDownload url |> Async.StartChild |> Async.RunSynchronously
        cache |> Map.add url ld, ld
    )

наконец, наша функция загрузки:

let downloadUrl url =
    downloadManager 
    |> applyReplyS (folderCache url)

// val downloadUrl: url: string -> Async<string>

Тестирование

let s = System.DateTime.UtcNow.Ticks
printfn "started %A" (s / 10000L)
let res = 
    List.init 50 (fun i -> i, downloadUrl (string <| i % 5) )
    |> List.groupBy (snd >> Async.RunSynchronously)
    |> List.map (fun (t, ts) -> sprintf "%s - %A" t (ts |> List.map fst ) )

let f = System.DateTime.UtcNow.Ticks
printfn "finish  %A" (f / 10000L)

printfn "elapsed %dms" ((f - s) / 10000L)

res |> printfn "Result: \n%A"

выдает следующий вывод:

started 63676683215256L
Downloaded  63676683215292L it took: 37ms "2"
Downloaded  63676683215292L it took: 36ms "3"
Downloaded  63676683215292L it took: 36ms "1"
Downloaded  63676683215291L it took: 38ms "0"
Downloaded  63676683215292L it took: 36ms "4"
finish  63676683215362L
elapsed 106ms
Result: 
["Downloaded  63676683215291L it took: 38ms "0" - [0; 5; 10; 15; 20; 25; 30; 35; 40; 45]";
 "Downloaded  63676683215292L it took: 36ms "1" - [1; 6; 11; 16; 21; 26; 31; 36; 41; 46]";
 "Downloaded  63676683215292L it took: 37ms "2" - [2; 7; 12; 17; 22; 27; 32; 37; 42; 47]";
 "Downloaded  63676683215292L it took: 36ms "3" - [3; 8; 13; 18; 23; 28; 33; 38; 43; 48]";
 "Downloaded  63676683215292L it took: 36ms "4" - [4; 9; 14; 19; 24; 29; 34; 39; 44; 49]"]
...