Распараллелить код во вложенных циклах - PullRequest
5 голосов
/ 05 января 2009

Вы всегда слышите, что функциональный код по своей природе проще распараллелить, чем нефункциональный код, поэтому я решил написать функцию, которая выполняет следующее:

С учетом ввода строк, суммируйте количество уникальных символов для каждой строки. Таким образом, с учетом ввода [ "aaaaa"; "bbb"; "ccccccc"; "abbbc" ] наш метод вернет a: 6; b: 6; c: 8.

Вот что я написал:

(* seq<#seq<char>> -> Map<char,int> *)
let wordFrequency input =
    input
    |> Seq.fold (fun acc text ->
        (* This inner loop can be processed on its own thread *)
        text
        |> Seq.choose (fun char -> if Char.IsLetter char then Some(char) else None)
        |> Seq.fold (fun (acc : Map<_,_>) item ->
            match acc.TryFind(item) with
            | Some(count) -> acc.Add(item, count + 1)
            | None -> acc.Add(item, 1))
            acc
        ) Map.empty

Этот код идеально распараллеливаем, потому что каждая строка в input может обрабатываться в своем собственном потоке. Это не так просто, как кажется, поскольку innerloop добавляет элементы на карту, совместно используемую всеми входными данными.

Я бы хотел, чтобы внутренний цикл был выделен в свой собственный поток, и я не хочу использовать какое-либо изменяемое состояние. Как бы я переписал эту функцию, используя рабочий процесс Async?

Ответы [ 4 ]

3 голосов
/ 13 февраля 2010

Вы можете написать это так:

let wordFrequency =
  Seq.concat >> Seq.filter System.Char.IsLetter >> Seq.countBy id >> Map.ofSeq

и распараллелить его только с двумя дополнительными символами, чтобы использовать модуль PSeq из DLL FSharp.PowerPack.Parallel.Seq вместо обычного модуля Seq:

let wordFrequency =
  Seq.concat >> PSeq.filter System.Char.IsLetter >> PSeq.countBy id >> Map.ofSeq

Например, время, затрачиваемое на вычисление частот из Библии короля Джеймса 5,5 Мб, падает с 4,75 до 0,66 с. Это 8,2-кратное ускорение на этом 8-ядерном компьютере.

2 голосов
/ 05 января 2009

Как уже указывалось, возникает конфликт обновления, если вы пытаетесь заставить разные потоки обрабатывать разные входные строки, поскольку каждый поток может увеличивать счетчик каждой буквы. Вы можете сделать так, чтобы каждый поток создавал свою собственную карту, а затем «складывал все карты», но этот последний шаг может быть дорогостоящим (и не очень подходящим для использования потоков из-за общих данных). Я думаю, что большие входные данные, скорее всего, будут работать быстрее, используя алгоритм, подобный приведенному ниже, где каждый поток обрабатывает различную букву для подсчета (для всех строк во входных данных). В результате каждый поток имеет свой собственный независимый счетчик, поэтому нет конфликтов обновления и нет окончательного шага для объединения результатов. Однако нам требуется предварительная обработка, чтобы обнаружить «набор уникальных букв», и на этом этапе возникает та же проблема конфликта. (На практике вы, вероятно, знаете заранее весь набор символов, например, алфавиты, а затем можете просто создать 26 потоков для обработки az и обойти эту проблему.) В любом случае, по-видимому, в основном вопрос состоит в том, чтобы изучить, «как писать F #». асинхронный код для разделения работы между потоками », поэтому приведенный ниже код демонстрирует это.

#light

let input = [| "aaaaa"; "bbb"; "ccccccc"; "abbbc" |]

// first discover all unique letters used
let Letters str = 
    str |> Seq.fold (fun set c -> Set.add c set) Set.empty 
let allLetters = 
    input |> Array.map (fun str -> 
        async { return Letters str })
    |> Async.Parallel 
    |> Async.Run     
    |> Set.union_all // note, this step is single-threaded, 
        // if input has many strings, can improve this

// Now count each letter on a separate thread
let CountLetter letter =
    let mutable count = 0
    for str in input do
        for c in str do
            if letter = c then
                count <- count + 1
    letter, count
let result = 
    allLetters |> Seq.map (fun c ->
        async { return CountLetter c })
    |> Async.Parallel 
    |> Async.Run

// print results
for letter,count in result do
    printfn "%c : %d" letter count

Я действительно «полностью изменил алгоритм», в основном потому, что исходный алгоритм, который вы использовали, не особенно подходит для прямого распараллеливания данных из-за конфликта обновления. В зависимости от того, что именно вы изучаете, этот ответ может быть вам, а может и не быть, особенно удовлетворительным.

1 голос
/ 14 января 2009

Параллель - не то же самое, что асинхронный, как Дон Сайм объясняет .

Так что, IMO, вам лучше использовать PLINQ для распараллеливания.

0 голосов
/ 05 января 2009

Я плохо говорю на F #, но я могу решить это. Подумайте об использовании карты / уменьшить:

let n = карточка (& Sigma;) - количество символов & sigma; в алфавите и сигме ;.

Этап карты:

Spawn n процессов, где назначением i -го процесса является подсчет количества вхождений символа & sigma; i во всем входном векторе.

Ступень уменьшения :

Соберите сумму по каждому из n процессов по порядку. Этот вектор - ваши результаты.

Теперь эта версия не приводит к каким-либо улучшениям по сравнению с последовательной версией; Я подозреваю, что здесь есть скрытая зависимость, которая затрудняет параллелизацию по своей сути, но я слишком устала и умру, чтобы доказать это сегодня вечером.

...