Я пытаюсь создать датчик с состоянием, join-averages
( здесь ).
Запуск блока let показывает, что я правильно соединяю значения.Но результат вывода по-прежнему не имеет объединенного значения.
...
c' {:tick-list {:uuid 1, :last-trade-price 11.1}, :ema-list {:uuid 1, :last-trade-price-exponential 10}, :sma-list {:uuid 1, :last-trade-price-average 10.1}}
record {:uuid 1, :last-trade-price 11.1}
c' {:tick-list {:uuid 2, :last-trade-price 11.2}, :ema-list {:uuid 2, :last-trade-price-exponential 11}, :sma-list {:uuid 2, :last-trade-price-average 10.2}}
record {:uuid 2, :last-trade-price 11.2}
...
c' {:tick-list {:uuid 3, :last-trade-price 11.3}, :ema-list {:uuid 3, :last-trade-price-exponential 12}, :sma-list {:uuid 3, :last-trade-price-average 10.3}}
record {:uuid 1, :last-trade-price-exponential 10}
record {:uuid 2, :last-trade-price-exponential 11}
record {:uuid 3, :last-trade-price 11.3}
record {:uuid 3, :last-trade-price-exponential 12}
Есть какие-либо идеи относительно того, почему join-averages
преобразователь с сохранением состояния не возвращает правильный результат?
CODE
(:require [clojure.core.async :refer [chan sliding-buffer <! go-loop pipeline onto-chan] :as async]
[clojure.set :refer [subset?]]
[clojure.tools.logging :as log])
(defn has-all-lists? [averages-map]
(subset? #{:tick-list :sma-list :ema-list} (->> averages-map keys (into #{}))))
(defn join-averages []
(let [state (atom {})]
(fn [rf]
(fn
([] (rf))
([accumulator] (rf accumulator))
([accumulator input]
(let [uuid (:uuid input)
entry (cond
(:last-trade-price-exponential input) {:ema-list input}
(:last-trade-price-average input) {:sma-list input}
(:last-trade-price input) {:tick-list input})]
(if-let [current (get @state uuid)]
(let [_ (swap! state update-in [uuid] merge entry)
c' (get @state uuid)]
(log/info "c'" c')
(log/info "accumulator" accumulator)
(log/info "state" (with-out-str (clojure.pprint/pprint @state)))
(if (has-all-lists? c')
c'
(rf accumulator input)))
(do (swap! state merge {uuid entry})
(rf accumulator input)))))))))
(comment
(let [ema-list [{:uuid "1" :last-trade-price-exponential 10}
{:uuid "2" :last-trade-price-exponential 11}
{:uuid "3" :last-trade-price-exponential 12}]
sma-list [{:uuid "1" :last-trade-price-average 10.1}
{:uuid "2" :last-trade-price-average 10.2}
{:uuid "3" :last-trade-price-average 10.3}]
tick-list [{:uuid "1" :last-trade-price 11.1}
{:uuid "2" :last-trade-price 11.2}
{:uuid "3" :last-trade-price 11.3}]
ec (chan (sliding-buffer 100))
sc (chan (sliding-buffer 100))
tc (chan (sliding-buffer 100))
_ (onto-chan ec ema-list)
_ (onto-chan sc sma-list)
_ (onto-chan tc tick-list)
merged-ch (async/merge [tc sc ec])
output-ch (chan (sliding-buffer 100) (join-averages))]
(async/pipeline 1 output-ch (join-averages) merged-ch)
(go-loop [r (<! output-ch)]
(when-not (nil? r)
(log/info "record" r)
(recur (<! output-ch))))))