Как реализовать параллельное логическое или с досрочным завершением в Clojure - PullRequest
5 голосов
/ 30 апреля 2019

Я хотел бы определить предикат, который, принимая в качестве входных данных некоторые предикаты с соответствующими входными данными (они могут быть заданы как ленивая последовательность вызовов), запускает их параллельно и вычисляет логический или из результатов, таким образомчто в тот момент, когда вызов предиката завершается, возвращая true, все вычисление также завершается (возвращая true).

Помимо оптимизации времени, это также поможет в некоторых случаях избежать прерывания (некоторыевызовы предикатов не могут прерваться).На самом деле, интерпретируя отсутствие завершения как третье undefined значение, этот предикат имитирует операцию или в логике Клини К3 (объединение в начальной центрированной алгебре Клини ).

Нечто подобное представлено здесь для семейства Хаскелл.Есть ли (желательно простой) способ сделать это в Clojure?

РЕДАКТИРОВАТЬ : Я решил добавить некоторые пояснения после прочтения комментариев.

(a) Сначалавсе, что происходит после исчерпания пула потоков, имеет меньшее значение.Я думаю, что создание пула потоков, достаточно большого для наших нужд, является разумным соглашением.

(b) Наиболее важным требованием является то, чтобы вызовы предикатов начинались параллельно и после завершения вызова предиката возвращали true,все остальные работающие потоки прерываются.Предполагаемое поведение таково:

  • , если есть вызов предиката, возвращающий true: параллель или возвращает true
  • иначе, если есть вызов предиката, который не завершается: параллель или не заканчивается
  • иначе: параллель или возвращает false

Другими словами, он ведет себя как объединение в 3-элементной решетке, заданной false <<code>undefined <<code>true, где undefined представляет не окончание.

(c) Параллель или должна быть в состоянии принять в качестве входных данных много предикатов и много входных предикатов (каждый из которых соответствуетпредикат).Но было бы еще лучше, если бы в качестве входных данных использовалась ленивая последовательность.Затем, назвав параллель или pany (для «параллельного любого»), мы могли бы иметь такие вызовы, как:

  • (pany (map (comp eval list) predicates inputs))
  • (pany (map (comp eval list) predicates (repeat input)))
  • (pany (map (comp eval list) (repeat predicate) inputs)), что эквивалентно (pany (map predicate (unchunk inputs)))

В качестве последнего замечания я думаю, что вполне естественно просить такие вещи, как pany, двойной pall или механизмдля того, чтобы такие параллельные сокращения с ранним завершением могли быть легко реализованы или даже встроены в язык, ориентированный на параллелизм, такой как Clojure.

Ответы [ 2 ]

0 голосов
/ 03 мая 2019

Рассмотрите ли вы принятие core.async для обработки параллельных задач с async/go или async/thread и досрочного возврата с async/alts!?

Например, чтобы превратить базовую функцию or из последовательной в параллельную. Мы можем создать макрос (я назвал его por), чтобы обернуть входные функции (или предикаты) в async/thread, а затем выбрать сокет async/alts! поверх них:

(defmacro por [& fns]
  `(let [[v# c#] (async/alts!!
                  [~@(for [f fns]
                       (list `async/thread f))])]
     v#))

(time
 (por (do (println "running a") (Thread/sleep 30) :a)
      (do (println "running b") (Thread/sleep 20) :b)
      (do (println "running c") (Thread/sleep 10) :c)))
;; running a
;; running b
;; running c
;; "Elapsed time: 11.919169 msecs"
;; => :c

По сравнению с оригинальным or (который запускается серийно):

(time
 (or (do (println "running a") (Thread/sleep 30) :a)
     (do (println "running b") (Thread/sleep 20) :b)
     (do (println "running c") (Thread/sleep 10) :c)))
;; running a
;; => :a
;; "Elapsed time: 31.642506 msecs"
0 голосов
/ 03 мая 2019

Я буду определять наши предикаты в терминах редуцирующей функции.На практике мы могли бы переопределить все итерационные функции Clojure для поддержки этой параллельной операции, но я просто буду использовать в качестве примера Reduce.

Я определю вычислительную функцию.Я просто использую тот же самый, но ничто не мешает вам иметь много.Функция "true", если она накапливает 1000.

(defn computor [acc val]
        (let [new (+' acc val)] (if (> new 1000) (reduced new) new)))

(reduce computor 0 (range))
;; =>
1035

(reduce computor 0 (range Long/MIN_VALUE 0))
;; =>
;; ...this is a proxy for a non-returning computation

;; wrap these up in a form suitable for application of reduction
(def predicates [[computor 0 (range)] 
                 [computor 0 (range Long/MIN_VALUE 0)]])

Теперь давайте разберемся с этим.Я хочу сделать шаг в каждом вычислении, и если одно из вычислений завершится, я хочу его вернуть.На самом деле один шаг за раз с использованием pmap очень медленный - единицы работы слишком малы, чтобы их можно было поточить.Здесь я изменил вещи, чтобы сделать 1000 итераций каждой единицы работы, прежде чем двигаться дальше.Вы, вероятно, настроите это на основе своей рабочей нагрузки и стоимости шага.

(defn p-or-reducer* [reductions]
        (let [splits (map #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
              complete (some #(if (empty? (second %)) (last (first %))) splits)]
          (or complete (recur (map second splits)))))

Затем я оберну это в драйвере.

(defn p-or [s]
  (p-or-reducer* (map #(apply reductions %) s)))

(p-or predicates)
;; =>
1035

Где вставить параллелизм ЦП?s / map / pmap / in p-or-reducer * должен это сделать.Я предлагаю просто распараллелить первую операцию, так как это приведет к вычислению сокращающих последовательностей.

(defn p-or-reducer* [reductions]
        (let [splits (pmap #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
              complete (some #(if (empty? (second %)) (last (first %))) splits)]
          (or complete (recur (map second splits)))))

(def parallelism-tester (conj (vec (repeat 40000 [computor 0 (range Long/MIN_VALUE 0)]))
                             [computor 0 (range)]))

(p-or parallelism-tester) ;; terminates even though the first 40K predicates will not

Очень трудно определить производительную универсальную версию этого.Без знания стоимости за итерацию сложно выработать эффективную стратегию параллелизма - если одна итерация займет 10 секунд, то мы, вероятно, сделаем один шаг за раз.Если это занимает 100 нс, нам нужно делать много шагов одновременно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...