Параллельно существует функция в F # - PullRequest
3 голосов
/ 16 декабря 2011

Мотивация

У меня есть долговременная логическая функция, которая должна выполняться в массиве, и я хочу немедленно вернуться, если элемент в массиве удовлетворяет условию. Я хотел бы выполнить поиск параллельно и завершить другие потоки, когда первый завершенный поток возвращает правильный ответ.

Вопрос

Каков хороший способ реализовать функцию параллельного существования в F #? Поскольку моя цель - производительность, эффективное решение предпочтительнее простого или идиоматического.

Контрольный пример

Предположим, я хочу узнать, существует ли одно значение в массиве или нет. А функция сравнения (equals) моделируется как вычислительно-дорогая:

open System.Diagnostics
open System.Threading

// Source at http://parallelpatterns.codeplex.com/releases/view/50473
let doCpuIntensiveOperation seconds (token:CancellationToken) throwOnCancel =
        if (token.IsCancellationRequested) then
            if (throwOnCancel) then token.ThrowIfCancellationRequested()
            false
        else
            let ms = int64 (seconds * 1000.0)
            let sw = new Stopwatch()
            sw.Start()
            let checkInterval = Math.Min(20000000, int (20000000.0 * seconds))

            // Loop to simulate a computationally intensive operation
            let rec loop i = 
                // Periodically check to see if the user has requested 
                // cancellation or if the time limit has passed
                let check = seconds = 0.0 || i % checkInterval = 0
                if check && token.IsCancellationRequested then
                    if throwOnCancel then token.ThrowIfCancellationRequested()
                    false
                elif check && sw.ElapsedMilliseconds > ms then
                    true
                else 
                  loop (i + 1)

            // Start the loop with 0 as the first value
            loop 0

let inline equals x y =
    doCpuIntensiveOperation 0.01 CancellationToken.None false |> ignore
    x = y

Массив состоит из 1000 случайно сгенерированных элементов, и значение поиска гарантируется во 2-й половине массива (поэтому последовательный поиск должен проходить хотя бы половину массива):

let rand = new System.Random()
let m = 1000
let N = 1000000
let xs = [|for _ in 1..m -> rand.Next(N)|]
let i = rand.Next((m-1)/2, m-1);;

#time "on";;
let b1 = parallelExists (equals xs.[i]) xs;; // Parallel
let b2 = Array.exists (equals xs.[i]) xs;; // Sequential

Ответы [ 2 ]

3 голосов
/ 16 декабря 2011

Я думаю, что вы можете предпринять следующие шаги:

  1. Создает несколько рабочих (потоков или асинхронных вычислений) и передает каждому равный фрагмент массива и токен отмены, который будет использоваться всеми рабочими

  2. Когда работник находит искомый элемент, он вызывает Cancel на токене (каждый работник должен проверять состояние отмены токена на каждой итерации и при необходимости получать залог)

В настоящее время у меня нет времени, чтобы написать код, поэтому я могу опустить некоторые детали.

Этот ответ и связанный с ним вопрос могут быть полезны.

UPDATE

Это пример того, о чем я думаю

open System
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks

let getChunks size array =
  let rec loop s n =
    seq {
      if n > 0 then
        let r = n - size
        if r > 0 then yield (s, size); yield! loop (s + size) r
        else yield (s, size + r)
    }      
  loop 0 (Array.length array)

[<Literal>]
let CHUNK_SIZE = 3

let parallelExists f (array:_[]) =
  use cts = new CancellationTokenSource()
  let rec checkSlice i n = 
    if n > 0 && not cts.IsCancellationRequested then
      if f array.[i] then cts.Cancel()
      else checkSlice (i + 1) (n - 1)
  let workers =
    array
    |> getChunks CHUNK_SIZE
    |> Seq.map (fun (s, c) -> Task.Factory.StartNew(fun () -> checkSlice s c))
    |> Seq.toArray
  try 
    Task.WaitAll(workers, cts.Token)
    false
  with :? OperationCanceledException -> true

Использование

let array = Array.init 10 id
let exists = 
  array |> parallelExists (fun i ->
    Thread.Sleep(500)
    i = 9)
printfn "%b" exists //true
1 голос
/ 16 декабря 2011

F # Powerpack имеет PSeq.exists, который отображается в PLINQ ParallelEnumerable.Any, который является частью BCL.Есть также ParallelEnumerable.First

Я пытался декомпилировать, но не сразу понял, что происходит.Поэтому вместо этого я выполнил следующий побочный код, чтобы убедиться, что он использует своего рода отмену, как только обнаружил элемент:

let elems = seq {
    for x = 0 to 1000000 do
        printfn "test"
        yield x }

open System
open System.Linq;;

ParallelEnumerable.First (ParallelEnumerable.AsParallel(elems), Func<_,_>(fun x -> x = 1))
...