Как запустить несколько циклов блокировки одновременно в Common Lisp. Объединение cl-asyn c с очередями в пуле потоков - PullRequest
5 голосов
/ 12 февраля 2020

Я новичок в программировании, а также в Common Lisp. Я пытаюсь решить следующую проблему:

Есть источники потока (S1, S2, S3), два процессора (P1, P2) (под процессором я не имею в виду процессор-процессор, а скорее функции обработки / подсистемы) и четыре направления потока (D1, D2, D3, D4). Под потоком я имею в виду, что данные поступают непрерывно, но могут периодически. Таким образом, действия чтения / записи могут блокироваться. Данные из потока должны быть агрегированы, а затем, в зависимости от того, что они есть, должны быть обработаны одним из процессоров. Каждый процессор производит (из его входного агрегата) четыре вида субагрегатов, которые затем отправляются одному из четырех вышеупомянутых пунктов назначения.

Псевдокод, который у меня есть для проблемы, выглядит примерно так:

Три цикла, по одному для S1, S2, S3, для агрегирования и отправки данных. Для конкретного источника (скажем, S2) псевдокод имеет вид:

 loop-2 : read from S2,
            when sufficient data aggregate, 
                if aggregate is kind-1 send to P1 
                  else send to P2.

Два цикла, по одному для P1, P2, для обработки и деагрегации. Для конкретного процессора (скажем, P1) псевдокод:

In a loop for P1:   take aggregate of kind-1,
                       process it to produce one or some of four sub-aggregates.
                          case: sub-aggregate 1 -> serialize and send/write to D1,
                                sub-aggregate 2 -> serialize and send/write to D2,
                                sub-aggregate 3 -> serialize and send/write to D3,
                                sub-aggregate 4 -> serialize and send/write to D4.

Между процессором и местом назначения я хочу использовать другой канал / очередь, но не упомянул, чтобы мой вопрос был проще для меня.

В моем понимании есть около десяти блокирующих точек. Три, если данные отсутствуют в источниках (S1, S2, S3); три, когда недостаточно данных на процессорах (P1, P2), и четыре, когда места назначения (D1, D2, D3, D4) не готовы к записи.

Итак, мой вопрос: как мне запустить все эти циклы одновременно (это одновременно слово?), Не блокируя друг друга; а также другие вещи, которые выполняются в программе.

Предыдущая работа: я рассмотрел этот ТАК вопрос, опрашивающие и уведомители из cl-asyn c. Глядя на Cliki-Concurrency , я также увидел threading-queue . Не говоря уже о cl-flow и green-thread . CL-поток упоминает, что это «Библиотека для асинхронного неблокирующего параллелизма в Common Lisp». Есть ли способ, я мог бы использовать библиотеку очереди с cl-asyn c или cl-flow? Например, flow-одновременно кажется правильным. Но можно ли его использовать с зеленым потоком cl-asyn c для запуска бесконечных циклов?

Моя сложность: я пытался прочитать каждую ссылку, о которой упоминал выше, и понять как можно больше. Но, честно говоря, я не могу понять, где / как вообще начать. Спасибо за помощь.

Редактировать: 28 февраля 2020 г.

Я пытался использовать cl-flow следующим образом. После инициализации системы потоков


(ql:quickload "simple-flow-dispatcher")
(ql:quickload "cl-muth")
(ql:quickload "cl-flow")
(ql:quickload "bt-semaphore")

(use-package :cl-flow :bt-semaphore)

(defvar *output* *standard-output*)

(defun print-error (e)
  (format *output* "~A" e))

(defvar *dispatcher* (simple-flow-dispatcher:make-simple-dispatcher
                      :threads 4
                      :error-handler #'print-error))

(defun run-flow (flow)
  (run *dispatcher* flow))

я использовал следующие функции:


(defun hello-world (n)
  (sleep n)
  (format t (concatenate 'string (gap n) "Loop-" (write-to-string n) "~%")))


(defun gap(n)
  (if (= n 0)
      ""
      (concatenate 'string "         " (gap (- n 1)))))

(defun loop-hello (n)
  (loop
     (hello-world n)))

(defun test-flow ()
  (run-flow 
     (flow:concurrently
         (flow:asynchronously
          (loop-hello 3))
         (flow:asynchronously
          (loop-hello 2))
         (flow:asynchronously
         (loop-hello 1)))))

hello-world - это простая функция, и я включил функцию gap, чтобы вставить некоторое пустое пространство перед текстом , Как и положено, я ожидал чередования трех столбцов напечатанных строк (одновременно, асинхронно и т. Д. 1078 *), но я получил только один (первый l oop -hello) столбец.

Мой текущий понимание следующее, пожалуйста, поправьте меня: l oop и sleep являются синхронными операторами, поэтому выполнение блокируется. Поэтому я искал asyn c версию l oop и уснул. Мне интересно, могу ли я сделать это с помощью cl-asyn c.

Тем временем я пытался упростить свою проблему. Вчера я также наткнулся на этот вопрос и его ответ. Я пытаюсь это понять.

Кроме того, поскольку я разместил свой исходный вопрос с использованием псевдокода, мне было интересно, как бы я хотел (скажем) библиотеку для обеспечения функциональности для меня (даже если она не существует) на данный момент).

Я попытался найти асин c код в Google, и большинство результатов показывают javascript библиотеки. До сих пор я мог смутно растереть только два. Python, Clojure (Java). Запись для небольшой функции, которая читает asyn c из q1, обрабатывает значения, считанные с v1 по v2, и записывает в q2 (q1 и q2 - очереди). Мне кажется, что я мог бы написать (сказать) так:

async def p1(x):
    #compute v2 from v1 and return v2

async def keep_doing_p1(q1, q2):
    while True:
        v1 = await q1.pop()
        v2 = p1(v1) # function p1 defined above
        await q2.push(v2)

Конечно, подходящие реализации q1 и q2 должны быть там. И в Clojure (Java), используя core.asyn c lib соответствующим образом:

(def q1 (async/chan))
(def q2 (async/chan))

(defn p1 [v1]
  ;;; Need a transducer version
  ;;; compute v2 from v1 and return)

(async/go
   (-->
       (async/< q1)
       (p1)
       (async/q2)))

Код, который я разместил выше, может быть неправильным. Я написал их на основе моего (очень вероятно) поверхностного понимания. У меня нет Python, Clojure (Java) опыта. Примеры не должны спровоцировать любые войны пламени. Я думаю, что все языки могут быть (являются) хорошими, мощными, удобными и обслуживать разных программистов. Я хотел бы решить проблему в CL. Я старший человек (по возрасту), но младший, по сути, новичок (в программировании). Я действительно люблю писать это в CL. Не то чтобы я стал или скоро стану великим программистом. Но с CL я чувствую, OMG, даже я могу писать программы!

Как бы то ни было, поэтому возвращаюсь к вопросу. Поэтому я хотел бы, чтобы библиотека обеспечивала синтаксис / функциональность следующим образом:

(cc-forever q1 q2 p1
  (push q2 (p1 (pop q1))))

;;; many other such cc-forever, operating concurrently

Как вы могли догадаться, cc-forever означает (ConCurrently-run FOREVER).

Так что я ищу помощь в создании такой функции / библиотеки. Пожалуйста, предложите лучшие / более простые альтернативы, если мой мыслительный процесс все еще сложен. Спасибо.

Опубликую свои попытки позже,

...