Лучшее использование F # Asyn c maxDegreeOfParallelism - PullRequest
0 голосов
/ 25 мая 2020

Я уже некоторое время использую рабочий процесс F # Asyn c и мне это очень нравится. Недавно я работаю над проектом, включающим множество независимых вычислений на отдельном кадре Deedle Frame, который я использую Async.Parallel. И в этих асинхронных режимах многие другие асинхронные методы также вычисляются параллельно (вложенный Asyn c?). Мой код выглядит примерно так:

let processRoomAsync room = 
    async{
        let x = do_something room
        return x
    }

let processCompressorAsync compressor = 
    async{
        let rooms = get_rooms_from_compressor compressor
        let! y = 
            Array.map processRoomAsync rooms
            |> Async.Parallel
        return do_something_with_y y
    }

let processBuilding building = 
    let compressors = get_compressors_for_building building
    let processResult = 
        Array.map processCompressorAsync compressors
        |> Async.Parallel
        |> Async.RunSynchronously
    do_something_with_result processResult

Идея приведенного выше кода заключается в том, что у меня есть building, например, отель, с множеством A C compressors, каждый compressor обслуживает множество rooms. Мне нужно обработать все rooms данные, которые подключаются к одному compressor, чтобы смоделировать эту compressor мощность. Затем все данные compressors агрегируются, чтобы получить общий результат building.

Когда количество Asyn c мало, результат ожидается при сравнении с временем выполнения одного asyn c.

Однако, когда количество asyn c больше 100, я замечаю значительное ухудшение вычислительного времени. Я прочитал какой-то документ о maxDegreeOfParallelism, но не совсем понимаю, какой номер мне следует использовать. Должно ли это быть количество виртуальных ЦП на моем компьютере? Если да, то как насчет вложенных Async.Parallel? Я пробовал несколько значений, и улучшение было минимальным.

Я где-то читал, что мне следует использовать MailboxProcessor, но не совсем понимаю этого. Кроме того, в некоторой документации говорится, что async предназначен для ввода-вывода, а не для вычислений, и этот сообщение , кажется, предлагает использовать Hopac, но мне интересно, стоит ли тратить время на изучение этого?

Спасибо и извините за длинный вопрос.

1 Ответ

0 голосов
/ 25 мая 2020

Возможные проблемы:

  • Вы выполняете больше потоков, чем ваш компьютер способен обработать, поэтому он приостанавливает один поток, чтобы вернуться к другому, и так далее. Здесь maxDegreeOfParallelism может помочь установить его на <= количество логических процессоров, которые у вас есть. Предполагается, что все ваши вычисления находятся в памяти. Я не думаю, что вам следует использовать вложенные Parallels, цель состоит в том, чтобы ваши общие потоки были <= количеству логических процессоров, которые у вас есть. </p>

  • Другая проблема заключается в том, что ваши операции могут выполняться в конфликты, например, они пытаются использовать те же ресурсы ввода-вывода, поэтому им нужно подождать, пока они не будут освобождены. Чем больше операций вы добавляете параллельно, тем больше им нужно ждать, поскольку пропускная способность ресурса ввода-вывода, например базы данных, ограничена. Сериализация ввода-вывода может помочь, например, зачислить все операции ввода-вывода в очередь, например, обработчик почтовых ящиков, чтобы не было разногласий. В конечном счете, лучший способ исправить ситуацию - устранить узкое место ввода-вывода.

Возможно, что-то вроде этого могло бы помочь:

  let yProcessor = MailboxProcessor.Start(fun inbox ->
        let rec messageLoop acc = async{

            let! msg = inbox.Receive()

            let result = do_something_with_y msg

            return! messageLoop acc::result
            }

        messageLoop ([])
        )


let processCompressorAsync compressor = 
   get_rooms_from_compressor compressor
   |>  Array.map do_something

let processBuilding building = 
    let compressors = get_compressors_for_building building
    let preComputation = 
        compressors
        |> Array.map processCompressorAsync
        |> Async.Parallel
        |> Async.RunSynchronously

    let processResult = 
        preComputation
        |> Array.iter (yProcessor.Post)
        |> Async.Parallel
        |> Async.RunSynchronously
...