У меня есть две реализации бронирования авиабилетов для клиентов в Clojure. Первый - последовательный, а второй - моя попытка распараллелить его. Насколько я понимаю, параллельная реализация должна быть быстрее.
Последовательная реализация использует атомы, в то время как параллельная реализация использует ссылки. На вход поступают только две коллекции - коллекция рейсов и коллекция клиентов. Процесс бронирования включает в себя корректировку рейсов в соответствии с бюджетом клиента и направлениями. Существует также процесс продажи, который корректирует цены на авиабилеты. Обратите внимание, что процесс продаж, а также сборы клиентов и рейсов одинаковы для обеих реализаций.
Ниже приведена последовательная реализация.
(ns flight-reservation
(:require [clojure.string]
#_[input-simple :as input]
[input-random :as input]))
(def logger (agent nil))
(defn log [& msgs] (send logger (fn [_] (apply println msgs))))
;(defn log [& msgs] nil)
(def flights
"All flights are encapsulated in a single atom in this implementation.
You are free to change this to a more appropriate mechanism."
(atom []))
(defn initialize-flights [initial-flights]
"Set `flights` atom to the `initial-flights`."
(reset! flights initial-flights))
(defn print-flights [flights]
"Print `flights`."
(letfn [(pricing->str [pricing]
(->> pricing
(map (fn [[p a t]] (clojure.pprint/cl-format nil "$~3d: ~3d ~3d" p a t)))
(clojure.string/join ", ")))]
(doseq [{:keys [id from to pricing]} flights]
(println (clojure.pprint/cl-format nil "Flight ~3d from ~a to ~a: ~a"
id from to (pricing->str pricing))))))
(defn- update-pricing [flight factor]
"Updated pricing of `flight` with `factor`."
(update flight :pricing
#(map (fn [[p a t]] [(* p factor) a t]) %)))
(defn start-sale [flight-ids]
"Sale: -20% on `flight-ids`."
(log "Start sale for flights" flight-ids)
(swap! flights
(fn [old-flights]
(vec (map
(fn [flight]
(if (contains? flight-ids (:id flight))
(update-pricing flight 0.80)
(defn end-sale [flight-ids]
"End sale: +25% (inverse of -20%) on `flight-ids`."
(log "End sale")
(swap! flights
(fn [old-flights]
(vec (map
(fn [flight]
(if (contains? flight-ids (:id flight))
(update-pricing flight 1.25)
(defn sort-pricing [pricing]
"Sort `pricing` from lowest to highest price."
(sort-by first pricing))
(defn filter-pricing-with-n-seats [pricing seats]
"Get `pricing` for which there are at least `seats` empty seats available."
(filter #(>= (second %) seats) pricing))
(defn lowest-available-price [flight seats]
"Returns the lowest price in `flight` for which at least `seats` empty seats
are available, or nil if none found."
(-> (:pricing flight) ; [[price available taken]]
(filter-pricing-with-n-seats seats)
(first) ; [price available taken]
(first))) ; price
(defn- find-flight [flights customer]
"Find a flight in `flights` that is on the route and within the budget of
`customer`. If a flight was found, returns {:flight flight :price price},
else returns nil."
(let [{:keys [_id from to seats budget]}
; flights that are on the route and within budget, and their price
(for [f flights
:when (and (= (:from f) from) (= (:to f) to))
:let [lowest-price (lowest-available-price f seats)]
:when (and (some? lowest-price) (<= lowest-price budget))]
{:flight f :price lowest-price})
(first (sort-by :price flights-and-prices))]
(defn- book [flight price seats]
"Updates `flight` to book `seats` at `price`."
(update flight :pricing
(fn [pricing]
(for [[p a t] pricing]
(if (= p price)
[p (- a seats) (+ t seats)]
[p a t])))))
(defn- process-customer [flights customer]
"Try to book a flight from `flights` for `customer`, returning the updated
flight if found, or nil if no suitable flight was found."
(if-let [{:keys [flight price]} (find-flight flights customer)]
(let [updated-flight (book flight price (:seats customer))]
(log "Customer" (:id customer) "booked" (:seats customer)
"seats on flight" (:id updated-flight) "at $" price " (< budget of $"
(:budget customer) ").")
(log "Customer" (:id customer) "did not find a flight.")
(def finished-processing?
"Set to true once all customers have been processed, so that sales process
can end."
(atom false))
(defn process-customers [customers]
(Thread/sleep 100)
"Process `customers` one by one."
(doseq [customer customers]
(swap! flights
(fn [flights]
(if-let [updated-flight (process-customer flights customer)]
(assoc flights (:id updated-flight) updated-flight)
(reset! finished-processing? true))
(defn sales-process []
"The sales process starts and ends sales periods, until `finished-processing?`
is true."
(loop []
(let [discounted-flight-ids (->> input/flights
(map :id)
(Thread/sleep input/TIME_BETWEEN_SALES)
(start-sale discounted-flight-ids)
(Thread/sleep input/TIME_OF_SALES)
(end-sale discounted-flight-ids))
(if (not @finished-processing?)
(defn main []
(initialize-flights input/flights)
(let [f1 (future (time (process-customers input/customers)))
f2 (future (sales-process))]
(println "Flights:")
(print-flights @flights))
И ниже параллельная реализация:
(ns flight-reservation
(:require [clojure.string]
#_[input-simple :as input]
[input-random :as input]))
(def N-THREADS 32)
(def logger (agent nil))
(defn log [& msgs] (send logger (fn [_] (apply println msgs))))
;(defn log [& msgs] nil)
(def flights
"All flights are encapsulated in a single atom in this implementation.
You are free to change this to a more appropriate mechanism."
(ref []))
(defn initialize-flights [initial-flights]
"Set `flights` atom to the `initial-flights`."
(dosync(ref-set flights initial-flights)))
(defn print-flights [flights]
"Print `flights`."
(letfn [(pricing->str [pricing]
(->> pricing
(map (fn [[p a t]] (clojure.pprint/cl-format nil "$~3d: ~3d ~3d" p a t)))
(clojure.string/join ", ")))]
(doseq [{:keys [id from to pricing]} flights]
(println (clojure.pprint/cl-format nil "Flight ~3d from ~a to ~a: ~a"
id from to (pricing->str pricing))))))
(defn- update-pricing [flight factor]
"Updated pricing of `flight` with `factor`."
(update flight :pricing
#(map (fn [[p a t]] [(* p factor) a t]) %)))
(defn start-sale [flight-ids]
"Sale: -20% on `flight-ids`."
(log "Start sale for flights" flight-ids)
(alter flights
(fn [old-flights]
(vec (pmap
(fn [flight]
(if (contains? flight-ids (:id flight))
(update-pricing flight 0.80)
(defn end-sale [flight-ids]
"End sale: +25% (inverse of -20%) on `flight-ids`."
(log "End sale")
(alter flights
(fn [old-flights]
(vec (pmap
(fn [flight]
(if (contains? flight-ids (:id flight))
(update-pricing flight 1.25)
(defn sort-pricing [pricing]
"Sort `pricing` from lowest to highest price."
(sort-by first pricing))
(defn filter-pricing-with-n-seats [pricing seats]
"Get `pricing` for which there are at least `seats` empty seats available."
(filter #(>= (second %) seats) pricing))
(defn lowest-available-price [flight seats]
"Returns the lowest price in `flight` for which at least `seats` empty seats
are available, or nil if none found."
(-> (:pricing flight) ; [[price available taken]]
(filter-pricing-with-n-seats seats)
(first) ; [price available taken]
(first))) ; price
(defn- find-flight [flights customer]
"Find a flight in `flights` that is on the route and within the budget of
`customer`. If a flight was found, returns {:flight flight :price price},
else returns nil."
(let [{:keys [_id from to seats budget]}
; flights that are on the route and within budget, and their price
(for [f flights
:when (and (= (:from f) from) (= (:to f) to))
:let [lowest-price (lowest-available-price f seats)]
:when (and (some? lowest-price) (<= lowest-price budget))]
{:flight f :price lowest-price})
(first (sort-by :price flights-and-prices))]
(defn- book [flight price seats]
"Updates `flight` to book `seats` at `price`."
(update flight :pricing
(fn [pricing]
(for [[p a t] pricing]
(if (= p price)
[p (- a seats) (+ t seats)]
[p a t])))))
(defn- process-customer [flights customer]
"Try to book a flight from `flights` for `customer`, returning the updated
flight if found, or nil if no suitable flight was found."
(if-let [{:keys [flight price]} (find-flight flights customer)]
(let [updated-flight (book flight price (:seats customer))]
(log "Customer" (:id customer) "booked" (:seats customer)
"seats on flight" (:id updated-flight) "at $" price " (< budget of $"
(:budget customer) ").")
(log "Customer" (:id customer) "did not find a flight.")
(def finished-processing?
"Set to true once all customers have been processed, so that sales process
can end."
(atom false))
(defn process-customers [customers]
"Process `customers` one by one."
(Thread/sleep 100)
(doseq [customer customers]
(alter flights
(fn [flights]
(if-let [updated-flight (process-customer flights customer)]
(assoc flights (:id updated-flight) updated-flight)
(reset! finished-processing? true))
(defn sales-process []
"The sales process starts and ends sales periods, until `finished-processing?`
is true."
(loop []
(let [discounted-flight-ids (->> input/flights
(pmap :id)
(Thread/sleep input/TIME_BETWEEN_SALES)
(start-sale discounted-flight-ids)
(Thread/sleep input/TIME_OF_SALES)
(end-sale discounted-flight-ids)
(if (not @finished-processing?)
(defn partitionCustomerInput
[threads customers]
(let [partitions (partition-all
(Math/ceil (/ (count customers) threads)) customers)]
(defn main []
(initialize-flights input/flights)
(let [f1 (time (doall (pmap process-customers (partitionCustomerInput N-THREADS input/customers))))
f2 (future (sales-process))]
(println "Flights:")
(print-flights @flights))
Последовательная реализация быстрее, чем параллельная реализация. Я не понимаю, почему, как и в параллельной реализации, внутри main, я распараллеливаю обработку клиентов с помощью pmap после разбиения коллекции клиентов на разделы, равные количеству желаемых потоков.