У меня есть следующий сценарий:
Есть какой-то сервис, который я использую для извлечения некоторых данных, передавая его на мой ввод.
Имея некоторые входные параметры, мне нужно выполнить N запросов к вышеупомянутому сервису, собрать выходные данные и выполнить некоторую нагрузку на процессор для каждого выхода.
Я пытаюсь добиться этого, используя основные / асинхронные каналы.
Вот моя попытка (схематически), которая вроде работает, но она не ведет себя, как хотелось бы.
Буду благодарен за любые подсказки о том, как его улучшить.
(defn produce-inputs
[in-chan inputs]
(let input-names-seq (map #(:name %) inputs)]
(doseq [input-name input-names-seq]
(async/go
(async/>! in-chan input-name)))))
(defn consume
[inputs]
(let [in-chan (async/chan 1)
out-chan (async/chan 1)]
(do
(produce-inputs in-chan inputs)
(async/go-loop []
(let [input-name (async/<! in-chan)]
(do
(retrieve-resource-from-service input-name
; response handler
(fn [resp]
(async/go
(let [result (:result resp)]
(async/>! out-chan result)))))
(when input-name
(recur)))))
; read from out-chan and do some heavy work for each entry
(async/go-loop []
(let [result (async/<! out-chan)]
(do-some-cpu-heavy-work result))))))
; entry point
(defn run
[inputs]
(consume inputs))
Есть ли способ обновить его так, чтобы в каждый момент времени было не более пяти активных запросов на обслуживание (retrieve-resource-from-service
)?
Если мое объяснение неясно, задавайте вопросы, я его обновлю.