Устранение неполадок с датчиком состояния - PullRequest
0 голосов
/ 29 мая 2018

Я пытаюсь создать датчик с состоянием, join-averages ( здесь ).

Запуск блока let показывает, что я правильно соединяю значения.Но результат вывода по-прежнему не имеет объединенного значения.

...
c' {:tick-list {:uuid 1, :last-trade-price 11.1}, :ema-list {:uuid 1, :last-trade-price-exponential 10}, :sma-list {:uuid 1, :last-trade-price-average 10.1}}
record {:uuid 1, :last-trade-price 11.1}
c' {:tick-list {:uuid 2, :last-trade-price 11.2}, :ema-list {:uuid 2, :last-trade-price-exponential 11}, :sma-list {:uuid 2, :last-trade-price-average 10.2}}
record {:uuid 2, :last-trade-price 11.2}
...
c' {:tick-list {:uuid 3, :last-trade-price 11.3}, :ema-list {:uuid 3, :last-trade-price-exponential 12}, :sma-list {:uuid 3, :last-trade-price-average 10.3}}
record {:uuid 1, :last-trade-price-exponential 10}
record {:uuid 2, :last-trade-price-exponential 11}
record {:uuid 3, :last-trade-price 11.3}
record {:uuid 3, :last-trade-price-exponential 12}

Есть какие-либо идеи относительно того, почему join-averages преобразователь с сохранением состояния не возвращает правильный результат?

CODE

(:require [clojure.core.async :refer [chan sliding-buffer <! go-loop pipeline onto-chan] :as async]
          [clojure.set :refer [subset?]]
          [clojure.tools.logging :as log])

(defn has-all-lists? [averages-map]
  (subset? #{:tick-list :sma-list :ema-list} (->> averages-map keys (into #{}))))

(defn join-averages []
  (let [state (atom {})]
    (fn [rf]
      (fn
        ([] (rf))
        ([accumulator] (rf accumulator))
        ([accumulator input]
         (let [uuid (:uuid input)
               entry (cond
                       (:last-trade-price-exponential input) {:ema-list input}
                       (:last-trade-price-average input) {:sma-list input}
                       (:last-trade-price input) {:tick-list input})]

           (if-let [current (get @state uuid)]

             (let [_ (swap! state update-in [uuid] merge entry)
                   c' (get @state uuid)]

               (log/info "c'" c')
               (log/info "accumulator" accumulator)
               (log/info "state" (with-out-str (clojure.pprint/pprint @state)))

               (if (has-all-lists? c')
                 c'
                 (rf accumulator input)))

             (do (swap! state merge {uuid entry})
                 (rf accumulator input)))))))))

(comment

  (let [ema-list [{:uuid "1" :last-trade-price-exponential 10}
                  {:uuid "2" :last-trade-price-exponential 11}
                  {:uuid "3" :last-trade-price-exponential 12}]
        sma-list [{:uuid "1" :last-trade-price-average 10.1}
                  {:uuid "2" :last-trade-price-average 10.2}
                  {:uuid "3" :last-trade-price-average 10.3}]
        tick-list [{:uuid "1" :last-trade-price 11.1}
                   {:uuid "2" :last-trade-price 11.2}
                   {:uuid "3" :last-trade-price 11.3}]

        ec (chan (sliding-buffer 100))
        sc (chan (sliding-buffer 100))
        tc (chan (sliding-buffer 100))

        _ (onto-chan ec ema-list)
        _ (onto-chan sc sma-list)
        _ (onto-chan tc tick-list)

        merged-ch (async/merge [tc sc ec])
        output-ch (chan (sliding-buffer 100) (join-averages))]

    (async/pipeline 1 output-ch (join-averages) merged-ch)
    (go-loop [r (<! output-ch)]
      (when-not (nil? r)
        (log/info "record" r)
        (recur (<! output-ch))))))

1 Ответ

0 голосов
/ 29 мая 2018

Вы не дали нам, как должен выглядеть ваш результат, поэтому я должен догадаться.

Кроме того, чтобы посмотреть, как реализовать преобразователь с отслеживанием состояния, я обычно просто смотрю на distinct.Вы должны инициировать свое состояние после инициализации датчика.Это должно работать:

(defn has-all-lists? [averages-map]
  (set/subset? #{:tick-list :sma-list :ema-list} (->> averages-map keys (into #{}))))

(defn join-averages []
  (fn [rf]
    (let [state (atom {})]
      (fn
        ([] (rf))
        ([accumulator] (rf accumulator))
        ([accumulator input]
         (let [uuid (:uuid input)
               entry (condp #(%1 %2) input
                       :last-trade-price-exponential {:ema-list input}
                       :last-trade-price-average {:sma-list input}
                       :last-trade-price {:tick-list input})]
           (let [nv (swap! state update-in [uuid] merge entry)
                 c' (get nv uuid)]
             (if (has-all-lists? c')
               (rf accumulator c')
               accumulator))))))))

(let [ema-list [{:uuid "1" :last-trade-price-exponential 10}
                {:uuid "2" :last-trade-price-exponential 11}
                {:uuid "3" :last-trade-price-exponential 12}]
      sma-list [{:uuid "1" :last-trade-price-average 10.1}
                {:uuid "2" :last-trade-price-average 10.2}
                {:uuid "3" :last-trade-price-average 10.3}]
      tick-list [{:uuid "1" :last-trade-price 11.1}
                 {:uuid "2" :last-trade-price 11.2}
                 {:uuid "3" :last-trade-price 11.3}]]
  (into []
        (join-averages)
        (concat ema-list sma-list tick-list)))
...