Как правильно завершить блокирующий поток (Lparallel Common Lisp) - PullRequest
3 голосов
/ 11 апреля 2019

В Lparallel API рекомендуемый способ завершить все многопоточные задачи - остановить ядро ​​с помощью (lparallel:end-kernel).Но когда поток блокируется - например, с (pop-queue queue1) ожиданием появления элемента в очереди - он все равно будет активен, когда ядро ​​остановлено.В этом случае (по крайней мере, в SBCL) завершение работы ядра иногда (но не каждый раз) завершается неудачно:

debugger invoked on a SB-KERNEL:BOUNDING-INDICES-BAD-ERROR in thread
#<THREAD "lparallel" RUNNING {1002F04973}>:
  The bounding indices 1 and NIL are bad for a sequence of length 0.
See also:
  The ANSI Standard, Glossary entry for "bounding index designator"
  The ANSI Standard, writeup for Issue SUBSEQ-OUT-OF-BOUNDS:IS-AN-ERROR

debugger invoked on a SB-SYS:INTERACTIVE-INTERRUPT in thread
#<THREAD "main thread" RUNNING {10012E0613}>:
  Interactive interrupt at #x1001484328.

Я предполагаю, что это связано с тем, что поток блокировки не завершается правильно.Как правильно завершить блокирующий поток перед выключением ядра?(API говорит, что kill-tasks следует использовать только в исключительных обстоятельствах, которые я принимаю, чтобы не применять к этим «нормальным» условиям отключения.)

1 Ответ

5 голосов
/ 11 апреля 2019

Проблема с уничтожением потока заключается в том, что это может произойти где угодно, когда поток может находиться в любом неизвестном состоянии. Единственный способ безопасного завершения потока - это элегантное завершение его работы, то есть вы ожидаете, что при обычных операциях у потока будет возможность узнать, что он должен перестать работать. Тогда вы сможете правильно очистить свои ресурсы, закрыть базы данных, освободить чужие указатели, зарегистрировать все, ...

В очередях, которые вы используете, есть операции, которые могут быть приостановлены, это простой, но безопасный способ гарантировать, что вы можете избежать блокировки навсегда и правильно завершить работу. Но это не единственный вариант (вы можете использовать их в дополнение к тому, что показано ниже).

Общий / глобальный флаг

Когда происходит тайм-аут или когда вы получаете сообщение, вы проверяете глобальную логическую переменную (или переменную, которая является общей для всех заинтересованных потоков). Это также простой способ выхода, и он может быть прочитан несколькими потоками. Однако это одновременный доступ, поэтому вы должны использовать блокировки или атомарные операции (http://www.sbcl.org/manual/#Atomic-Operations),, например, использовать defglobal и тип fixnum с atomic-incf и т. Д.

Управляющие сообщения

Отправка управляющих данных в очереди и их использование, чтобы определить, как корректно завершить работу и как распространять информацию по трубам, или как перезапустить вещи. Это безопасно (только передача сообщений) и позволяет использовать любой элемент управления, который вы захотите внедрить в свой поток.

(defpackage :so (:use :cl :bt :lparallel.queue))
(in-package :so)

Давайте определим две службы.

Первый из них возвращает свой ввод:

(defun echo (in out)
  (lambda ()
    (loop
      for value = (pop-queue in)
      do (push-queue value out)
      until (eq value :stop))))

Обратите внимание, как ожидается, что он будет правильно завершен при вводе :stop, и как он также распространяет сообщение :stop в свою очередь вывода.

Второй поток выполнит модульное дополнение, а также немного поспит между запросами:

(defun modulo-adder (x m in out)
  (lambda ()
    (loop
      for value = (progn (sleep 0.02)
                         (pop-queue in))
      do (push-queue (typecase value
                       (keyword value)
                       (number (mod (+ x value) m)))
                     out)
      until (eq value :stop))))

Создание очередей:

(defparameter *q1* (make-queue))
(defparameter *q2* (make-queue))

Создание тем:

(progn
  (bt:make-thread (echo *q1* *q2*) :name "echo")
  (bt:make-thread (modulo-adder 5 1024 *q2* *q1*) :name "adder"))

Оба потока связаны друг с другом по кругу, создавая бесконечный цикл дополнений. В настоящее время между потоками не передается никакого значения, и вы можете видеть, что они работают, например, с slime-list-threads или любым другим способом, предоставленным реализацией; В любом случае (bt:all-threads) возвращает список.

slime-list-threads

10 adder                          Running 
11 echo                           Running 
...

Добавить элемент, теперь существует бесконечный обмен данными между потоками:

(push-queue 10 *q1*)

Подождите, затем остановите их обоих:

(push-queue :stop *q1*)

Оба потока остановились изящно (они больше не видны в списках потоков). Мы можем проверить, что остается в очередях (результат варьируется от одного теста к другому):

(list (try-pop-queue *q1*)
      (try-pop-queue *q2*))
(99 NIL)

(list (try-pop-queue *q1*)
      (try-pop-queue *q2*))
(:STOP NIL)

(list (try-pop-queue *q1*)
      (try-pop-queue *q2*))
(NIL NIL)

прерывание потока

Вы создаете сервис, управляемый сообщениями или глобальным флагом, но затем у вас возникает ошибка, и поток зависает. Вместо того, чтобы убить его и потерять все, вы хотите, по крайней мере, правильно размотать стек потоков. Это тоже опасно, но вы можете использовать bt:interrupt, чтобы остановить поток везде, где он сейчас запущен и выполнить функцию.

(define-condition stop () ())
(defun signal-stop ()
  (signal 'stop))

(defun endless ()
  (let ((output *standard-output*))
    (lambda ()
      (print "START" output)
      (unwind-protect (handler-case (loop)
                        (stop ()
                          (print "INTERRUPTED" output)))
        (print "STOP" output)))))

Запустите это:

(bt:make-thread (endless) :name "loop")

Это печатает "START" и циклы. Затем мы прерываем это:

(bt:interrupt-thread (find "loop"
                           (bt:all-threads)
                           :test #'string=
                           :key #'bt:thread-name)
                     #'signal-stop)

Печатается следующее:

"INTERRUPTED" 
"STOP" 

Эти сообщения не будут напечатаны, если поток будет уничтожен, но учтите, что вы все равно можете получить поврежденные данные, учитывая случайность прерывания. Кроме того, он может разблокировать блокирующие вызовы, такие как sleep или pop-queue.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...