Ускорение транзакций Clojure - PullRequest
0 голосов
/ 09 мая 2018

У меня есть две реализации бронирования авиабилетов для клиентов в Clojure. Первый - последовательный, а второй - моя попытка распараллелить его. Насколько я понимаю, параллельная реализация должна быть быстрее. Последовательная реализация использует атомы, в то время как параллельная реализация использует ссылки. На вход поступают только две коллекции - коллекция рейсов и коллекция клиентов. Процесс бронирования включает в себя корректировку рейсов в соответствии с бюджетом клиента и направлениями. Существует также процесс продажи, который корректирует цены на авиабилеты. Обратите внимание, что процесс продаж, а также сборы клиентов и рейсов одинаковы для обеих реализаций.

Ниже приведена последовательная реализация.

(ns flight-reservation
  (:require [clojure.string]
            [clojure.pprint]
            #_[input-simple :as input]
            [input-random :as input]))

(def logger (agent nil))
(defn log [& msgs] (send logger (fn [_] (apply println msgs))))
;(defn log [& msgs] nil)

(def flights
  "All flights are encapsulated in a single atom in this implementation.
  You are free to change this to a more appropriate mechanism."
  (atom []))

(defn initialize-flights [initial-flights]
  "Set `flights` atom to the `initial-flights`."
  (reset! flights initial-flights))

(defn print-flights [flights]
  "Print `flights`."
  (letfn [(pricing->str [pricing]
            (->> pricing
              (map (fn [[p a t]] (clojure.pprint/cl-format nil "$~3d: ~3d ~3d" p a t)))
              (clojure.string/join ", ")))]
    (doseq [{:keys [id from to pricing]} flights]
      (println (clojure.pprint/cl-format nil "Flight ~3d from ~a to ~a: ~a"
        id from to (pricing->str pricing))))))

(defn- update-pricing [flight factor]
  "Updated pricing of `flight` with `factor`."
  (update flight :pricing
    #(map (fn [[p a t]] [(* p factor) a t]) %)))

(defn start-sale [flight-ids]
  "Sale: -20% on `flight-ids`."
  (log "Start sale for flights" flight-ids)
  (swap! flights
    (fn [old-flights]
      (vec (map
             (fn [flight]
               (if (contains? flight-ids (:id flight))
                 (update-pricing flight 0.80)
                 flight))
             old-flights)))))

(defn end-sale [flight-ids]
  "End sale: +25% (inverse of -20%) on `flight-ids`."
  (log "End sale")
  (swap! flights
    (fn [old-flights]
      (vec (map
             (fn [flight]
               (if (contains? flight-ids (:id flight))
                 (update-pricing flight 1.25)
                 flight))
             old-flights)))))

(defn sort-pricing [pricing]
  "Sort `pricing` from lowest to highest price."
  (sort-by first pricing))

(defn filter-pricing-with-n-seats [pricing seats]
  "Get `pricing` for which there are at least `seats` empty seats available."
  (filter #(>= (second %) seats) pricing))

(defn lowest-available-price [flight seats]
  "Returns the lowest price in `flight` for which at least `seats` empty seats
  are available, or nil if none found."
  (-> (:pricing flight)                 ; [[price available taken]]
    (filter-pricing-with-n-seats seats)
    (sort-pricing)
    (first)                             ; [price available taken]
    (first)))                           ; price

(defn- find-flight [flights customer]
  "Find a flight in `flights` that is on the route and within the budget of
  `customer`. If a flight was found, returns {:flight flight :price price},
  else returns nil."
  (let [{:keys [_id from to seats budget]}
          customer
        flights-and-prices
          ; flights that are on the route and within budget, and their price
          (for [f flights
                :when (and (= (:from f) from) (= (:to f) to))
                :let [lowest-price (lowest-available-price f seats)]
                :when (and (some? lowest-price) (<= lowest-price budget))]
            {:flight f :price lowest-price})
        cheapest-flight-and-price
          (first (sort-by :price flights-and-prices))]
    cheapest-flight-and-price))

(defn- book [flight price seats]
  "Updates `flight` to book `seats` at `price`."
  (update flight :pricing
    (fn [pricing]
      (for [[p a t] pricing]
        (if (= p price)
          [p (- a seats) (+ t seats)]
          [p a t])))))

(defn- process-customer [flights customer]
  "Try to book a flight from `flights` for `customer`, returning the updated
  flight if found, or nil if no suitable flight was found."
  (if-let [{:keys [flight price]} (find-flight flights customer)]
    (let [updated-flight (book flight price (:seats customer))]
      (log "Customer" (:id customer) "booked" (:seats customer)
        "seats on flight" (:id updated-flight) "at $" price " (< budget of $"
        (:budget customer) ").")
      updated-flight)
    (do
      (log "Customer" (:id customer) "did not find a flight.")
      nil)))

(def finished-processing?
  "Set to true once all customers have been processed, so that sales process
  can end."
  (atom false))

(defn process-customers [customers]
  (Thread/sleep 100)
  "Process `customers` one by one."
  (doseq [customer customers]
    (swap! flights
      (fn [flights]
        (if-let [updated-flight (process-customer flights customer)]
          (assoc flights (:id updated-flight) updated-flight)
          flights))))
  (reset! finished-processing? true))

(defn sales-process []
  "The sales process starts and ends sales periods, until `finished-processing?`
  is true."
  (loop []
    (let [discounted-flight-ids (->> input/flights
                                     (map :id)
                                     shuffle
                                     (take input/NUMBER_OF_DISCOUNTED_FLIGHTS)
                                     set)]
      (Thread/sleep input/TIME_BETWEEN_SALES)
      (start-sale discounted-flight-ids)
      (Thread/sleep input/TIME_OF_SALES)
      (end-sale discounted-flight-ids))
    (if (not @finished-processing?)
      (recur))))

(defn main []
  (initialize-flights input/flights)
  (let [f1 (future (time (process-customers input/customers)))
        f2 (future (sales-process))]
    @f1
    @f2)
  (println "Flights:")
  (print-flights @flights))

(main)
(shutdown-agents)

И ниже параллельная реализация:

(ns flight-reservation
  (:require [clojure.string]
            [clojure.pprint]
            #_[input-simple :as input]
            [input-random :as input]))
(def N-THREADS 32) 

(def logger (agent nil))
(defn log [& msgs] (send logger (fn [_] (apply println msgs))))
;(defn log [& msgs] nil)

(def flights
  "All flights are encapsulated in a single atom in this implementation.
  You are free to change this to a more appropriate mechanism."
  (ref []))

(defn initialize-flights [initial-flights]
  "Set `flights` atom to the `initial-flights`."
  (dosync(ref-set flights initial-flights)))

(defn print-flights [flights]
  "Print `flights`."
  (letfn [(pricing->str [pricing]
            (->> pricing
              (map (fn [[p a t]] (clojure.pprint/cl-format nil "$~3d: ~3d ~3d" p a t)))
              (clojure.string/join ", ")))]
    (doseq [{:keys [id from to pricing]} flights]
      (println (clojure.pprint/cl-format nil "Flight ~3d from ~a to ~a: ~a"
        id from to (pricing->str pricing))))))

(defn- update-pricing [flight factor]
  "Updated pricing of `flight` with `factor`."
  (update flight :pricing
    #(map (fn [[p a t]] [(* p factor) a t]) %)))

(defn start-sale [flight-ids]
  "Sale: -20% on `flight-ids`."
  (log "Start sale for flights" flight-ids)
  (dosync
  (alter flights
    (fn [old-flights]
      (vec (pmap
             (fn [flight]
               (if (contains? flight-ids (:id flight))
                 (update-pricing flight 0.80)
                 flight))
             old-flights))))))

(defn end-sale [flight-ids]
  "End sale: +25% (inverse of -20%) on `flight-ids`."
  (log "End sale")
  (dosync
  (alter flights
    (fn [old-flights]
      (vec (pmap
             (fn [flight]
               (if (contains? flight-ids (:id flight))
                 (update-pricing flight 1.25)
                 flight))
             old-flights))))))

(defn sort-pricing [pricing]
  "Sort `pricing` from lowest to highest price."
  (sort-by first pricing))

(defn filter-pricing-with-n-seats [pricing seats]
  "Get `pricing` for which there are at least `seats` empty seats available."
  (filter #(>= (second %) seats) pricing))

(defn lowest-available-price [flight seats]
  "Returns the lowest price in `flight` for which at least `seats` empty seats
  are available, or nil if none found."
  (-> (:pricing flight)                 ; [[price available taken]]
    (filter-pricing-with-n-seats seats)
    (sort-pricing)
    (first)                             ; [price available taken]
    (first)))                           ; price

(defn- find-flight [flights customer]
  "Find a flight in `flights` that is on the route and within the budget of
  `customer`. If a flight was found, returns {:flight flight :price price},
  else returns nil."
  (let [{:keys [_id from to seats budget]}
          customer
        flights-and-prices
          ; flights that are on the route and within budget, and their price
          (for [f flights
                :when (and (= (:from f) from) (= (:to f) to))
                :let [lowest-price (lowest-available-price f seats)]
                :when (and (some? lowest-price) (<= lowest-price budget))]
            {:flight f :price lowest-price})
        cheapest-flight-and-price
          (first (sort-by :price flights-and-prices))]
    cheapest-flight-and-price))

(defn- book [flight price seats]
  "Updates `flight` to book `seats` at `price`."
  (update flight :pricing
    (fn [pricing]
      (for [[p a t] pricing]
        (if (= p price)
          [p (- a seats) (+ t seats)]
          [p a t])))))

(defn- process-customer [flights customer]
  "Try to book a flight from `flights` for `customer`, returning the updated
  flight if found, or nil if no suitable flight was found."
  (if-let [{:keys [flight price]} (find-flight flights customer)]
    (let [updated-flight (book flight price (:seats customer))]
      (log "Customer" (:id customer) "booked" (:seats customer)
        "seats on flight" (:id updated-flight) "at $" price " (< budget of $"
        (:budget customer) ").")
      updated-flight)
    (do
      (log "Customer" (:id customer) "did not find a flight.")
      nil)))

(def finished-processing?
  "Set to true once all customers have been processed, so that sales process
  can end."
  (atom false))

(defn process-customers [customers]
  "Process `customers` one by one."
  (Thread/sleep 100)
  (dosync
  (doseq [customer customers]

    (alter flights
      (fn [flights]
        (if-let [updated-flight (process-customer flights customer)]
          (assoc flights (:id updated-flight) updated-flight)
          flights)))))
  (reset! finished-processing? true))

(defn sales-process []
  "The sales process starts and ends sales periods, until `finished-processing?`
  is true."
  (loop []
    (let [discounted-flight-ids (->> input/flights
                                     (pmap :id)
                                     shuffle
                                     (take input/NUMBER_OF_DISCOUNTED_FLIGHTS)
                                     set)]
      (Thread/sleep input/TIME_BETWEEN_SALES)
      (start-sale discounted-flight-ids)
      (Thread/sleep input/TIME_OF_SALES)
      (end-sale discounted-flight-ids)
    (if (not @finished-processing?)
      (recur)))))

(defn partitionCustomerInput 
  [threads customers]
  (let [partitions (partition-all 
     (Math/ceil (/ (count customers) threads))  customers)]
        partitions))

(defn main []
  (initialize-flights input/flights)
  (let [f1 (time (doall (pmap process-customers (partitionCustomerInput N-THREADS input/customers))))
        f2 (future (sales-process))]






    @f2)
  (println "Flights:")
  (print-flights @flights))

(main)
(shutdown-agents)

Последовательная реализация быстрее, чем параллельная реализация. Я не понимаю, почему, как и в параллельной реализации, внутри main, я распараллеливаю обработку клиентов с помощью pmap после разбиения коллекции клиентов на разделы, равные количеству желаемых потоков.

Ответы [ 2 ]

0 голосов
/ 20 мая 2018

Проблема с параллелизмом и pmap заключается в том, что он по-прежнему гарантирует упорядочение возвращаемого значения. Это означает, что если у вас пул pmap из 8 ядер / рабочих и одного блока, то весь пакет будет заблокирован на это время.

Взгляните на неупорядоченный pmap 'upmap' в Claypoole, где упорядочение возвращаемых значений отбрасывается в пользу более быстрых возвращаемых значений. Здесь одна операция блокировки все равно оставит 7 ядер для более быстрых операций.

0 голосов
/ 11 мая 2018

Вы не увидите улучшения скорости, если функция, которую вы хотите запустить параллельно, тривиальна:

(->> input/flights
     (pmap :id) ;; map or pmap here
     ...)

В этом случае стоимость функции, выполняемой параллельно (возвращая значение для клавиши id), намного меньше стоимости параллельной отправки вызовов функции (и сборки результата обратно), поэтому Не удивительно, что последовательная версия работает быстрее.

См. Этот пример:

user=> (time (count (map inc (range 1000))))
"Elapsed time: 0.391237 msecs"
1000
user=> (time (count (pmap inc (range 1000))))
"Elapsed time: 9.451128 msecs"
1000

Тривиальная операция (inc) - это не дорогое вычисление, которое выиграло бы от параллельного выполнения. Я думаю, что та же идея применима и к другим применениям pmap в вашей параллельной версии кода.

В этом примере вы можете найти полезным использование pmap в Clojuredocs .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...