Clojurescript: обрабатывать запросы порциями с основными / асинхронными каналами - PullRequest
0 голосов
/ 16 мая 2018

У меня есть следующий сценарий:

Есть какой-то сервис, который я использую для извлечения некоторых данных, передавая его на мой ввод.

Имея некоторые входные параметры, мне нужно выполнить N запросов к вышеупомянутому сервису, собрать выходные данные и выполнить некоторую нагрузку на процессор для каждого выхода.

Я пытаюсь добиться этого, используя основные / асинхронные каналы.

Вот моя попытка (схематически), которая вроде работает, но она не ведет себя, как хотелось бы. Буду благодарен за любые подсказки о том, как его улучшить.

(defn produce-inputs
  [in-chan inputs]
  (let input-names-seq (map #(:name %) inputs)]
    (doseq [input-name input-names-seq]
      (async/go
        (async/>! in-chan input-name)))))

(defn consume
  [inputs]
  (let [in-chan (async/chan 1)
        out-chan (async/chan 1)]
        (do
          (produce-inputs in-chan inputs)
          (async/go-loop []
                   (let [input-name (async/<! in-chan)]
                     (do
                         (retrieve-resource-from-service input-name 
                                                         ; response handler
                                                         (fn [resp]
                                                           (async/go
                                                             (let [result (:result resp)]
                                                               (async/>! out-chan result)))))
                         (when input-name
                           (recur)))))

     ; read from out-chan and do some heavy work for each entry
     (async/go-loop []
                   (let [result (async/<! out-chan)]
                         (do-some-cpu-heavy-work result))))))

; entry point
(defn run
  [inputs]
  (consume inputs))

Есть ли способ обновить его так, чтобы в каждый момент времени было не более пяти активных запросов на обслуживание (retrieve-resource-from-service)?

Если мое объяснение неясно, задавайте вопросы, я его обновлю.

1 Ответ

0 голосов
/ 17 мая 2018

Вы можете создать другой канал, который будет действовать в качестве корзины токенов для ограничения частоты ваших запросов.

См. Эту ссылку для примера использования корзины токенов в секундуограничение.

Чтобы ограничить количество одновременных запросов, вы можете сделать что-то вроде:

(defn consume [inputs]
  (let [in-chan (async/chan 1)
        out-chan (async/chan 1)
        bucket (async/chan 5)]
    ;; ...
    (dotimes [_ 5] (async/put! bucket :token))
    (async/go-loop []
      (let [input-name (async/<! in-chan)
            token (async/<! bucket)]
        (retrieve-resource-from-service
          input-name 
          ; response handler
          (fn [resp]
            (async/go
              (let [result (:result resp)]
                (async/>! out-chan result)
                (async/>! bucket token)))))
        (when input-name
          (recur))))
    ;; ...
    ))

Создается новый канал, bucket, и помещаются пять элементов.внутрь.Перед отправкой запроса мы берем токен из корзины и кладем его обратно после завершения запроса.Если в канале bucket нет токенов, нам нужно подождать, пока один из запросов не будет выполнен.

Примечание: это всего лишь набросок кода, возможно, вам придется его исправить.В частности, если у вас есть какие-либо обработчики ошибок в вашей функции retrieve-resource-from-service, вы должны положить токен обратно в случае ошибки, чтобы избежать возможных тупиков.

...