Как программировать параллельные списки на основе потоков? - PullRequest
4 голосов
/ 05 ноября 2010

Мне нужно в качестве примера, как запрограммировать параллельную функцию iter, используя ocaml-thread.Моя первая идея состояла в том, чтобы иметь функцию, подобную этой:

let procs = 4 ;;

let rec _part part i lst =   match lst with
      [] -> ()
    | hd::tl -> 
        let idx = i mod procs in
        (* Printf.printf "part idx=%i\n" idx; *)
        let accu = part.(idx) in
          part.(idx) <- (hd::accu);
          _part part (i+1) tl ;;

Тогда параллельный итер может выглядеть следующим образом (здесь как вариант на основе процесса):

let iter f lst =   let part = Array.create procs [] in
    _part part 0 lst;
    let rec _do i =
      (* Printf.printf "do idx=%i\n" i; *)
      match Unix.fork () with
          0 -> (* Code of child *)
            if i < procs then 
              begin
                (* Printf.printf "child %i\n" i; *)
                List.iter f part.(i) 
              end
        | pid -> (* Code of father *)
            (* Printf.printf "father %i\n" i; *)
            if i >= procs then ignore (Unix.waitpid [] pid)
            else _do (i+1)
    in
      _do 0 ;;

Поскольку использованиеThread-module немного отличается, как бы я кодировал это, используя потоковый модуль ocaml?

И еще один вопрос, функция _part () должна просканировать весь список, чтобы разделить их на n частей, а затемкаждая часть будет проходить через каждый собственный процесс (здесь).Все еще существует решение без разделения списка?

Ответы [ 2 ]

3 голосов
/ 05 ноября 2010

Если у вас есть функция, которая обрабатывает список, и вы хотите запустить ее в нескольких списках независимо, вы можете вызвать Thread.create с этой функцией и каждым списком.Если вы храните свои списки в массиве part, то:

let threads = Array.map (Thread.create (List.iter f)) part in
Array.iter Thread.join threads

Потоки INRIA OCaml не являются фактическими потоками: в каждый момент времени выполняется только один поток, что означает, что если у вас четыре процессора и четыре потока, всечетыре потока будут использовать один и тот же процессор, а остальные три останутся неиспользованными.

Где потоки полезны, так это то, что они по-прежнему допускают асинхронное программирование: некоторые примитивы модуля Thread могут ждать, пока внешний ресурс станет доступным.Это может сократить время, затрачиваемое вашим программным обеспечением на заблокированный недоступный ресурс, потому что вы можете сделать так, чтобы другой поток в это время делал что-то другое.Вы также можете использовать это для одновременного запуска нескольких внешних асинхронных процессов (например, запрос нескольких веб-серверов через HTTP).Если у вас мало блокировок, связанных с ресурсами, это вам не поможет.

Что касается вопроса о разбиении списка: чтобы получить доступ к элементу списка, вы должны пройти все предыдущие элементы.,Хотя этот обход теоретически может быть разделен на несколько потоков или процессов, накладные расходы на связь, вероятно, сделают его намного медленнее, чем простое разделение элементов в одном процессе.Или используя массивы.

2 голосов
/ 05 ноября 2010

Ответ на вопрос из комментариев.Ответ не совсем вписывается в сам комментарий.

Существует блокировка во время выполнения OCaml.Блокировка снимается, когда поток OCaml собирается ввести функцию C, которую

  • может заблокировать;
  • может занять много времени.

Таку вас может быть только один поток OCaml, использующий кучу, но иногда вы можете использовать функции C без использования кучи, работающие параллельно с ним.

См., например, файл ocaml-3.12.0/otherlibs/unix/write.c

memmove (iobuf, &Byte(buf, ofs), numbytes); // if we kept the data in the heap
                                            // the GC might move it from
                                            // under our feet.
enter_blocking_section();                   // release lock.
                                            // Another OCaml thread may
                                            // start in parallel of this one now.
ret = write(Int_val(fd), iobuf, numbytes);
leave_blocking_section();                   // take lock again to continue
                                            // with Ocaml code.
...