«Задушил» асинхронную загрузку в F # - PullRequest
6 голосов
/ 03 июня 2011

Я пытаюсь загрузить более 3000 фотографий, на которые есть ссылки из резервной копии xml моего блога.Проблема, с которой я столкнулся, заключается в том, что если одна из этих фотографий больше не доступна, весь асинхронный процесс блокируется, потому что AsyncGetResponse не делает тайм-ауты.

ildjarn помог мне собратьверсия AsyncGetResponse, которая завершается с ошибкой по тайм-ауту, но ее использование дает много больше тайм-аутов - как если бы запросы были только в очереди.Кажется, что все веб-запросы запускаются «немедленно», единственный способ заставить его работать, это установить тайм-аут на время, необходимое для загрузки всех их вместе взятых: что не очень хорошо, потому что это означает, что янеобходимо настроить время ожидания в зависимости от количества изображений.

Достиг ли я предела ванили async?Стоит ли мне вместо этого смотреть на реактивные расширения?

Это немного смущает, потому что я уже задал два вопросов здесь об этом конкретном битекод, и у меня до сих пор не работает так, как я хочу!

Ответы [ 4 ]

9 голосов
/ 03 июня 2011

Я думаю, что должен быть лучший способ узнать, что файл недоступен, чем использовать тайм-аут. Я не совсем уверен, но есть ли способ заставить его вызвать исключение, если файл не может быть найден? Тогда вы можете просто обернуть свой код async в try .. with, и вам следует избегать большинства проблем.

В любом случае, если вы хотите написать свой собственный «менеджер параллелизма», который выполняет определенное количество запросов параллельно и ставит в очередь оставшиеся ожидающие запросы, тогда в F # проще всего использовать агенты (тип MailboxProcessor). Следующий объект инкапсулирует поведение:

type ThrottlingAgentMessage = 
  | Completed
  | Work of Async<unit>

/// Represents an agent that runs operations in concurrently. When the number
/// of concurrent operations exceeds 'limit', they are queued and processed later
type ThrottlingAgent(limit) = 
  let agent = MailboxProcessor.Start(fun agent -> 
    /// Represents a state when the agent is blocked
    let rec waiting () = 
      // Use 'Scan' to wait for completion of some work
      agent.Scan(function
        | Completed -> Some(working (limit - 1))
        | _ -> None)
    /// Represents a state when the agent is working
    and working count = async { 
      while true do
        // Receive any message 
        let! msg = agent.Receive()
        match msg with 
        | Completed -> 
            // Decrement the counter of work items
            return! working (count - 1)
        | Work work ->
            // Start the work item & continue in blocked/working state
            async { try do! work 
                    finally agent.Post(Completed) }
            |> Async.Start
            if count < limit then return! working (count + 1)
            else return! waiting () }
    working 0)      

  /// Queue the specified asynchronous workflow for processing
  member x.DoWork(work) = agent.Post(Work work)
6 голосов
/ 03 июня 2011

Нет ничего проще. :)

Я думаю, что проблемы, с которыми вы сталкиваетесь, являются неотъемлемой частью проблемной области (в отличие от простых проблем с моделью асинхронного программирования, хотя они в некоторой степени взаимодействуют).

Скажем, вы хотите загрузить 3000 фотографий. Во-первых, в вашем .NET-процессе есть что-то вроде System.Net.ConnectionLimit или что-то, имя которого я забыл, например, ограничьте число одновременных HTTP-соединений, которые ваш процесс .NET может запустить одновременно (и я думаю, что по умолчанию используется значение «2»). Таким образом, вы можете найти этот элемент управления и установить его на большее число, и это поможет.

Но затем, ваша машина и интернет-соединение имеют ограниченную пропускную способность. Таким образом, даже если вы попытаетесь одновременно запустить 3000 HTTP-соединений, каждое отдельное соединение будет работать медленнее в зависимости от ограничений канала пропускной способности. Так что это также будет взаимодействовать с таймаутами. (И это даже не учитывает, какие виды дросселей / лимитов на сервере. Может быть, если вы отправите 3000 запросов, он подумает, что вы атакуете DoS и внесет в черный список ваш IP.)

Таким образом, это действительно проблемная область, где хорошее решение требует некоторого интеллектуального регулирования и управления потоком для управления использованием основных системных ресурсов.

Как и в другом ответе, агенты F # (MailboxProcessors) являются хорошей моделью программирования для создания такой логики регулирования / регулирования потока.

(Даже при том, что, если большинство файлов изображений имеют размер 1 МБ, но в них есть файл 1 ГБ, этот единственный файл может отключиться по таймауту.)

Во всяком случае, это не столько ответ на вопрос, сколько просто указание на ту внутреннюю сложность, которая существует в самой проблемной области. (Возможно, это также наводит на мысль о том, почему «менеджеры загрузок» пользовательского интерфейса так популярны.)

4 голосов
/ 14 июня 2011

Вот вариант ответа Томаса, потому что мне был нужен агент, который мог бы вернуть результаты.

type ThrottleMessage<'a> = 
    | AddJob of (Async<'a>*AsyncReplyChannel<'a>) 
    | DoneJob of ('a*AsyncReplyChannel<'a>) 
    | Stop

/// This agent accumulates 'jobs' but limits the number which run concurrently.
type ThrottleAgent<'a>(limit) =    
    let agent = MailboxProcessor<ThrottleMessage<'a>>.Start(fun inbox ->
        let rec loop(jobs, count) = async {
            let! msg = inbox.Receive()  //get next message
            match msg with
            | AddJob(job) -> 
                if count < limit then   //if not at limit, we work, else loop
                    return! work(job::jobs, count)
                else
                    return! loop(job::jobs, count)
            | DoneJob(result, reply) -> 
                reply.Reply(result)           //send back result to caller
                return! work(jobs, count - 1) //no need to check limit here
            | Stop -> return () }
        and work(jobs, count) = async {
            match jobs with
            | [] -> return! loop(jobs, count) //if no jobs left, wait for more
            | (job, reply)::jobs ->          //run job, post Done when finished
                async { let! result = job 
                        inbox.Post(DoneJob(result, reply)) }
                |> Async.Start
                return! loop(jobs, count + 1) //job started, go back to waiting
        }
        loop([], 0)
    )
    member m.AddJob(job) = agent.PostAndAsyncReply(fun rep-> AddJob(job, rep))
    member m.Stop() = agent.Post(Stop)

В моем конкретном случае, мне просто нужно использовать его в качестве «карты», поэтому я добавил статическую функцию:

    static member RunJobs limit jobs = 
        let agent = ThrottleAgent<'a>(limit)
        let res = jobs |> Seq.map (fun job -> agent.AddJob(job))
                       |> Async.Parallel
                       |> Async.RunSynchronously
        agent.Stop()
        res

Это кажется, работает нормально ...

0 голосов
/ 05 июля 2017

Вот готовое решение:

FSharpx.Control предлагает функцию Async.ParallelWithThrottle .Я не уверен, что это лучшая реализация, так как использует SemaphoreSlim.Но простота использования велика, и, поскольку моему приложению не нужна максимальная производительность, оно работает достаточно хорошо для меня.Хотя, поскольку это библиотека, если кто-то знает, как ее улучшить, всегда приятно сделать из библиотеки лучшие исполнители из коробки, чтобы остальные могли просто использовать работающий код и просто выполнить свою работу!*

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...