манипулирование атомом, содержащим коллекцию ссылок в clojure - PullRequest
0 голосов
/ 12 мая 2018

У меня есть приложение, которое должно бронировать авиабилеты для клиентов в рамках указанного бюджета.Таким образом, у меня есть данные клиентов и доступные данные рейсов.Затем я разрабатываю решения в Clojure следующим образом.

Сначала я создаю атом полетов:

(def flights
   (atom []))

Затем я создаю функцию для инициализации полетов в атом, содержащий набор ссылок.Здесь я передаю данные о рейсах, которые включены далее в этот пост.

(defn initialize-flights [initial-flights]
   (reset! flights (map ref initial-flights)))

Затем я обрабатываю клиентов с помощью функции обработки клиентов следующим образом.И это то, где это действительно сбивает с толку.

(defn process-customers [customers]

(doseq [customer1 (partitionCustomerInput N-THREADS customers)]


  (doseq [customer2  customer1]

    (swap! flights
      (fn [flights_collection]
        (if-let [updated-flight (process-customer flights_collection customer2)]

          (assoc flights (:id updated-flight) updated-flight)
          flights_collection)))))


  (reset! finished-processing? true))

Внутри клиентов-процессов Я передаю сбор полетов заказчику-процессу (уведомление процесса-покупателя является вспомогательной функцией для клиентов-заказчиков, и они не являются той же функцией).На этом этапе сбор данных о рейсах представляет собой сбор ссылок на информацию о рейсах. Клиент должен выполнять поиск по списку, и в случае, если клиент имеет право на полет в нем, он использует функцию книги для редактирования рейса.Как мне передать сборы рейсов производственному заказчику?Таким образом, процесс-заказчик не выполняет поиск по ссылкам на полеты и не изменяет ссылки на полеты?

Ниже приведена функция процесса-клиента, за которой следуют его вспомогательные функции.

(defn- process-customer [flights customer]
  "Try to book a flight from `flights` for `customer`, returning the updated
  flight if found, or nil if no suitable flight was found."
  (if-let [{:keys [flight price]} (find-flight flights customer)]
    (let [updated-flight (book flight price (:seats customer))]
      (log "Customer" (:id customer) "booked" (:seats customer)
        "seats on flight" (:id updated-flight) "at $" price " (< budget of $"
        (:budget customer) ").")
      updated-flight)
    (do
      (log "Customer" (:id customer) "did not find a flight.")
      nil)))


(defn filter-pricing-with-n-seats [pricing seats]
  "Get `pricing` for which there are at least `seats` empty seats available."
  (filter #(>= (second %) seats) pricing))

(defn lowest-available-price [flight seats]
  "Returns the lowest price in `flight` for which at least `seats` empty seats
  are available, or nil if none found."
  (-> (:pricing flight)                 ; [[price available taken]]
    (filter-pricing-with-n-seats seats)
    (sort-pricing)
    (first)                             ; [price available taken]
    (first)))                           ; price

(defn- find-flight [flights customer]
  "Find a flight in `flights` that is on the route and within the budget of
  `customer`. If a flight was found, returns {:flight flight :price price},
  else returns nil."
  (let [{:keys [_id from to seats budget]}
          customer
        flights-and-prices
          ; flights that are on the route and within budget, and their price
          (for [f flights
                :when (and (= (:from f) from) (= (:to f) to))
                :let [lowest-price (lowest-available-price f seats)]
                :when (and (some? lowest-price) (<= lowest-price budget))]
            {:flight f :price lowest-price})
        cheapest-flight-and-price
          (first (sort-by :price flights-and-prices))]
    cheapest-flight-and-price))

(defn- book [flight price seats]
  "Updates `flight` to book `seats` at `price`."
  (update flight :pricing
    (fn [pricing]
      (for [[p a t] pricing]
        (if (= p price)
          [p (- a seats) (+ t seats)]
          [p a t])))))



(def finished-processing?
  "Set to true once all customers have been processed, so that sales process
  can end."
  (atom false))

(defn partitionCustomerInput 
  [threads customers]
  (let [partitions (partition-all 
     (Math/ceil (/ (count customers) threads))  customers)]
        partitions))

Ниже приведена основная функция.Он инициализирует полеты и запускает обработку клиентов.

 (defn main []
      (initialize-flights input/flights)
       (let [f1 (future (time (process-customers input/customers)))


        @f1

        )

      (println "Flights:")
      (print-flights (map deref @flights)))
    (main)
    (shutdown-agents)

Ниже приведен список клиентов и рейсов.

(def flights
      [{:id 0
        :from "BRU" :to "ATL"
        :carrier "Delta"
        :pricing [[600 150 0] ; price; # seats available at that price; # seats taken at that price
                  [650  50 0]
                  [700  50 0]
                  [800  50 0]]}
       {:id 1
        :from "BRU" :to "LON"
        :carrier "Brussels Airlines"
        :pricing [[300 150 0]
                  [350  50 0]
                  [370  20 0]
                  [380  30 0]]}
       {:id 2
        :from "BRU" :to "LON"
        :carrier "Brussels Airlines"
        :pricing [[250 100 0]
                  [300  50 0]]}
       {:id 3
        :from "BRU" :to "MAD"
        :carrier "Brussels Airlines"
        :pricing [[200 150 0]
                  [250  50 0]
                  [300 100 0]]}
       {:id 4
        :from "BRU" :to "MAD"
        :carrier "Iberia"
        :pricing [[250 150 0]
                  [300  50 0]]}])

(def customers
  [{:id  0 :from "BRU" :to "ATL" :seats 5 :budget 700}
   {:id  1 :from "BRU" :to "ATL" :seats 5 :budget 550}
   {:id  2 :from "BRU" :to "LON" :seats 6 :budget 270}
   {:id  3 :from "BRU" :to "ATL" :seats 4 :budget 600}
   {:id  4 :from "BRU" :to "LON" :seats 3 :budget 270}
   {:id  5 :from "BRU" :to "LON" :seats 9 :budget 250}
   {:id  6 :from "BRU" :to "MAD" :seats 5 :budget 200}
   {:id  7 :from "BRU" :to "MAD" :seats 9 :budget 150}
   {:id  8 :from "BRU" :to "LON" :seats 5 :budget 250}
   {:id  9 :from "BRU" :to "ATL" :seats 4 :budget 500}
   {:id 10 :from "BRU" :to "MAD" :seats 1 :budget 180}
   {:id 11 :from "BRU" :to "LON" :seats 2 :budget 320}
   {:id 12 :from "BRU" :to "ATL" :seats 3 :budget 850}
   {:id 13 :from "BRU" :to "ATL" :seats 4 :budget 200}])

Кроме того, обратите внимание, что я хочу использовать ссылки для этой реализации, чтобы изменить рейсы какref предлагает поддержку для скоординированного чтения и записи, чтобы изменить полеты атомарно.Я стремлюсь сформулировать высокопараллельное решение для этого приложения, и конфликты не могут быть допущены.

1 Ответ

0 голосов
/ 17 мая 2018

Я думаю, что вам нужен реф вместо атома на верхнем уровне.Похоже, что вам нужно будет согласовать изменение для отдельного рейса и изменения в списке рейсов.Что если один поток изменяет полет, в то время как другой поток удаляет его из списка?Все ваши process-customer побочные эффекты должны быть выполнены в пределах (dosync).

производительности, поэтому все должно быть в порядке, потому что если вы не измените свою ссылку на список рейсов в транзакции, она не будетПовторите попытку другой транзакции, которая изменяет ее.

Другая причина в том, что вы нарушаете очень важное правило для swap!.Функция, переданная в swap!, не должна иметь побочных эффектов, поскольку она может быть повторена STM.Изменение ссылки является побочным эффектом и может привести к трудностям для понимания ошибок.

Так что я бы сделал что-то вроде

(def flights 
  (ref [(ref {:id "flight-1"}) 
        (ref {:id "flight-2"})]))

;; Run a bunch of these in their own threads...
(doseq [customer partitioned-customers]
  (dosync (process-customer customer flights)))

Тогда вы можете точно настроить процесс-customer с помощью alter, commute и ensure, чтобы максимизировать параллелизм и минимизировать повторные попытки.

Надеюсь, это поможет и удачи!

...