Многопоточная работа выполняется путем управления одновременным доступом к изменяемым
общие состояния, т. е. у вас есть замок вокруг общей структуры данных,
и каждый поток читает или пишет в него.
Однако рекомендуется минимизировать количество данных,
доступ одновременно. Очереди - это способ отделить работников от каждого
другие, каждый поток управляет своим локальным состоянием и обменивается данными
только через сообщения; это потокобезопасно, потому что доступ к
очереди контролируются замками и состоянием
переменные .
То, что вы делаете в своем основном потоке, это опрос , когда очередь
пустой; это может работать, но это контрпродуктивно, так как очереди
используются в качестве механизма синхронизации, но здесь вы делаете
Синхронизируй себя.
(ql:quickload :lparallel)
(defpackage :so (:use :cl
:lparallel
:lparallel.queue
:lparallel.kernel-util))
(in-package :so)
Давайте изменим foo
, чтобы он получал две очереди, одну для входящих
запросы и один для ответов. Здесь мы выполняем простое преобразование в
данные отправляются и для каждого входного сообщения, есть ровно один
выходное сообщение, но это не всегда так.
(defun foo (in out)
(push-queue (1+ (pop-queue in)) out))
Измените test
, чтобы поток управления основывался только на чтении / записи в очереди:
(defun test ()
(with-temp-kernel (1)
(let ((c (make-channel))
(foo-in (make-queue))
(foo-out (make-queue)))
(submit-task c #'foo foo-in foo-out)
;; submit data to task (could be blocking)
(push-queue 0 foo-in)
;; wait for message from task (could be blocking too)
(pop-queue foo-out))))
Но как можно избежать опроса в тесте, если выполняется несколько задач? Разве вам не нужно постоянно проверять, когда выполняется какой-либо из них, чтобы вы могли продвинуться в очереди?
Вы можете использовать другой механизм параллелизма, аналогичный listen и poll / epoll , где вы смотрите несколько
источник событий и реагировать всякий раз, когда один из них готов. Существуют такие языки, как Go ( select ) и Erlang ( receive ), где
это вполне естественно выразить. На стороне Lisp библиотека Calispel предоставляет аналогичный механизм чередования (pri-alt
и fair-alt
). Например, следующее взято из кода теста Калиспеля:
(pri-alt ((? control msg)
(ecase msg
(:clean-up (setf cleanup? t))
(:high-speed (setf slow? nil))
(:low-speed (setf slow? t))))
((? channel msg)
(declare (type fixnum msg))
(vector-push-extend msg out))
((otherwise :timeout (if cleanup? 0 nil))
(! reader-results out)
(! thread-expiration (bt:current-thread))
(return)))
В случае lparallel такого механизма нет, но вы можете пойти довольно далеко только с очередями, если вы пометите свои сообщения идентификаторами.
Если вам нужно отреагировать, как только или задача t1
или t2
даст результат, тогда обе эти задачи записывают в один и тот же канал результатов:
(let ((t1 (foo :id 1 :in i1 :out res))
(t2 (bar :id 2 :in i2 :out res)))
(destructuring-bind (id message) (pop-queue res)
(case id
(1 ...)
(2 ...))))
Если вам нужно синхронизировать код для случаев, когда t1
и t2
генерируют результат, разрешите им записывать в разные каналы:
(let ((t1 (foo :id 1 :in i1 :out o1))
(t2 (bar :id 2 :in i2 :out o2)))
(list (pop-queue o1)
(pop-queue o2)))