Использование очередей в Lparallel Library (Common Lisp) - PullRequest
3 голосов
/ 24 марта 2019

Базовое обсуждение очередей в библиотеке lparallel в https://z0ltan.wordpress.com/2016/09/09/basic-concurrency-and-parallelism-in-common-lisp-part-4a-parallelism-using-lparallel-fundamentals/#channels говорит, что очереди «разрешают передачу сообщений между рабочими потоками».В приведенном ниже тесте используется общая очередь для координации основного и подчиненного потока, где основной просто ожидает завершения подчиненного, прежде чем выйти:

(defun foo (q)
  (sleep 1)
  (lparallel.queue:pop-queue q))  ;q is now empty

(defun test ()
  (setf lparallel:*kernel* (lparallel:make-kernel 1))
  (let ((c (lparallel:make-channel))
        (q (lparallel.queue:make-queue)))
    (lparallel.queue:push-queue 0 q)
    (lparallel:submit-task c #'foo q)
    (loop do (sleep .2)
             (print (lparallel.queue:peek-queue q))
          when (lparallel.queue:queue-empty-p q)
            do (return)))
  (lparallel:end-kernel :wait t))

Это работает, как и ожидалось, с выдачей результата:

* (test)

0
0
0
0
NIL
(#<SB-THREAD:THREAD "lparallel" FINISHED values: NIL {10068F2B03}>)

Мой вопрос о том, правильно ли я использую функцию очереди lparallel или полностью.Казалось бы, очередь просто заменяет использование глобальной переменной для хранения объекта, совместно используемого потоком.В чем преимущество дизайна при использовании очереди?Как правило, рекомендуется назначать одну очередь для каждой отправленной задачи (при условии, что задача должна сообщаться)?Спасибо за более глубокое понимание.

1 Ответ

5 голосов
/ 25 марта 2019

Многопоточная работа выполняется путем управления одновременным доступом к изменяемым общие состояния, т. е. у вас есть замок вокруг общей структуры данных, и каждый поток читает или пишет в него.

Однако рекомендуется минимизировать количество данных, доступ одновременно. Очереди - это способ отделить работников от каждого другие, каждый поток управляет своим локальным состоянием и обменивается данными только через сообщения; это потокобезопасно, потому что доступ к очереди контролируются замками и состоянием переменные .

То, что вы делаете в своем основном потоке, это опрос , когда очередь пустой; это может работать, но это контрпродуктивно, так как очереди используются в качестве механизма синхронизации, но здесь вы делаете Синхронизируй себя.

(ql:quickload :lparallel)
(defpackage :so (:use :cl
                      :lparallel
                      :lparallel.queue
                      :lparallel.kernel-util))
(in-package :so)

Давайте изменим foo, чтобы он получал две очереди, одну для входящих запросы и один для ответов. Здесь мы выполняем простое преобразование в данные отправляются и для каждого входного сообщения, есть ровно один выходное сообщение, но это не всегда так.

(defun foo (in out)
  (push-queue (1+ (pop-queue in)) out))

Измените test, чтобы поток управления основывался только на чтении / записи в очереди:

(defun test ()
  (with-temp-kernel (1)
    (let ((c (make-channel))
          (foo-in (make-queue))
          (foo-out (make-queue)))
      (submit-task c #'foo foo-in foo-out)
      ;; submit data to task (could be blocking)
      (push-queue 0 foo-in)
      ;; wait for message from task (could be blocking too)
      (pop-queue foo-out))))

Но как можно избежать опроса в тесте, если выполняется несколько задач? Разве вам не нужно постоянно проверять, когда выполняется какой-либо из них, чтобы вы могли продвинуться в очереди?

Вы можете использовать другой механизм параллелизма, аналогичный listen и poll / epoll , где вы смотрите несколько источник событий и реагировать всякий раз, когда один из них готов. Существуют такие языки, как Go ( select ) и Erlang ( receive ), где это вполне естественно выразить. На стороне Lisp библиотека Calispel предоставляет аналогичный механизм чередования (pri-alt и fair-alt). Например, следующее взято из кода теста Калиспеля:

(pri-alt ((? control msg)
          (ecase msg
            (:clean-up (setf cleanup? t))
            (:high-speed (setf slow? nil))
            (:low-speed (setf slow? t))))
         ((? channel msg)
          (declare (type fixnum msg))
          (vector-push-extend msg out))
         ((otherwise :timeout (if cleanup? 0 nil))
          (! reader-results out)
          (! thread-expiration (bt:current-thread))
          (return)))

В случае lparallel такого механизма нет, но вы можете пойти довольно далеко только с очередями, если вы пометите свои сообщения идентификаторами.

Если вам нужно отреагировать, как только или задача t1 или t2 даст результат, тогда обе эти задачи записывают в один и тот же канал результатов:

(let ((t1 (foo :id 1 :in i1 :out res))
      (t2 (bar :id 2 :in i2 :out res)))
   (destructuring-bind (id message) (pop-queue res)
     (case id
       (1 ...)
       (2 ...))))

Если вам нужно синхронизировать код для случаев, когда t1 и t2 генерируют результат, разрешите им записывать в разные каналы:

(let ((t1 (foo :id 1 :in i1 :out o1))
      (t2 (bar :id 2 :in i2 :out o2)))
   (list (pop-queue o1)
         (pop-queue o2)))
...