Как выполнять параллельные транзакции в Clojure - PullRequest
0 голосов
/ 30 апреля 2019

У меня есть последовательность клиентов, которая должна обрабатываться параллельно.Я пытался использовать pmap для этого.Результат мучительно медленный, намного медленнее, чем последовательная реализация.Внутренняя функция process-customer имеет транзакцию.Очевидно, что pmap запускает все транзакции одновременно, и они заканчивают тем, что пытались убить производительность.Каков наилучший способ распараллелить это?

(defn process-customers [customers]
  (doall 
    (pmap 
      (fn [sub-customers]
        (doseq [customer sub-customers]
          (process-customer customer)))
      (partition-all 10 customers))))

EDIT : Функция process-customer включает следующие шаги.Я пишу шаги для краткости.Все шаги выполняются внутри транзакции, чтобы гарантировать, что другая параллельная транзакция не вызовет несоответствий, таких как отрицательный запас.

(defn- process-customer [customer]
  "Process `customer`. Consists of three steps:
  1. Finding all stores in which the requested products are still available.
  2. Sorting the found stores to find the cheapest (for the sum of all products).
  3. Buying the products by updating the `stock`.
)

РЕДАКТИРОВАТЬ 2: Следующая версия process-customers имеет ту же производительность, что и параллельная process-customers выше.Ниже очевидно, что это последовательно.

(defn process-customers [customers]
  "Process `customers` one by one. In this code, this happens sequentially."
  (doseq [customer customers]
    (process-customer customer)))

1 Ответ

1 голос
/ 05 мая 2019

Я предполагаю, что ваша транзакция блокируется на складе для полного жизненного цикла process-customer. Это будет медленным, так как все клиенты участвуют в гонках по одной и той же вселенной магазинов Если вы можете разделить процесс на две фазы: 1) цитирование и 2) выполнение и применение транзакции только к (2), то производительность должна быть намного лучше. Или, если вы купитесь на agent программирование, у вас будет автоматически определена граница транзакции на уровне сообщений. Вот один пример, который вы можете рассмотреть:

(defn get-best-deal
  "Returns the best deal for a given order with given stores (agent)"
  [stores order]
  ;;
  ;; request for quotation from 1000 stores (in parallel)
  ;;
  (doseq [store stores]
    (send store get-quote order))
  ;;
  ;; wait for reply, up to 0.5s
  ;;
  (apply await-for 500 stores)
  ;;
  ;; sort and find the best store
  ;;
  (when-let [best-store (->> stores
                             (filter (fn [store] (get-in @store [:quotes order])))
                             (sort-by (fn [store] (->> (get-in @store [:quotes order])
                                                       vals
                                                       (reduce +))))
                             first)]
    {:best-store best-store
     :invoice-id (do
                   ;; execute the order
                   (send best-store fulfill order)
                   ;; wait for the transaction to complete
                   (await best-store)
                   ;; get an invoice id
                   (get-in @best-store [:invoices order]))}))

и найти лучшие предложения из 1000 магазинов на 100 заказов (всего 289 позиций) из 100 товаров:

(->> orders
       (pmap (partial get-best-deal stores))
       (filter :invoice-id)
       count
       time)
;; => 57
;; "Elapsed time: 312.002328 msecs"

Пример бизнес-логики:

(defn get-quote
  "issue a quote by checking inventory"
  [store {:keys [order-items] :as order}]
  (if-let [quote (->> order-items
                   (reduce reduce-inventory
                           {:store store
                            :quote nil})
                   :quote)]
    ;; has inventory to generate a quote
    (assoc-in store [:quotes order] quote)
    ;; no inventory
    (update store :quotes dissoc order)))

(defn fulfill
  "fulfill an order if previuosly quoted"
  [store order]
  (if-let [quote (get-in store [:quotes order])]
    ;; check inventory again and generate invoice
    (let [[invoice inventory'] (check-inventory-and-generate-invoice store order)]
      (cond-> store
        invoice (->
                  ;; register invoice
                  (assoc-in [:invoices order] invoice)
                  ;; invalidate the quote
                  (update :quotes dissoc order)
                  ;; update inventory
                  (assoc :inventory inventory'))))
    ;; not quoted before
    store))


...