зачем использовать ожидание / уведомление при создании нового потока? - PullRequest
2 голосов
/ 10 июля 2020

Я сделал несколько приблизительных расчетов времени отклика системы на ожидание / уведомление по сравнению с созданием нового потока более 100 потоков, и похоже, что это почти то же самое - с stddev намного выше для ожидания / уведомления, поэтому похоже на создание нового потоки могут быть более эффективными.

Значения в наносекундах

threads
  {:min 193161, :mean 1410865.0, :stddev 444840.72741115646, :skew -0.755184951510553}

wait/notify
  {:min 159629, :mean 1332270.8, :stddev 703829.1499013959, :skew 0.07768945756036612}

Предполагается, что это будет более эффективно?

Изменить Код находится в закрытии. Я помещу несколько комментариев в

(def ---thread-benchmark---
  (let [summary (cc/queue)]
    (dotimes [i 100]
      (let [queue (cc/queue)
   
            ;; start time is called
            start (System/nanoTime)
   
            ;; immediately calls future
            _     (future
   
                    ;; and logs the time at which the future is created
                    (let [end  (System/nanoTime)
                          start (cc/take queue)]
                      
                      ;; combines both start and end times
                      (cc/put summary {:start start
                                       :end end
                                       :total (- end start)} )))
            _      (cc/put queue start)]))
    
    ;; wait for threads to complete 
    (Thread/sleep 100)
   
    ;; output results
    (cc/drain summary)))

Настройка ожидания / уведомления:

(defrecord TestStore [instance threads lock])

;; This sets up a store (which has the lock and holds all the threads)

(def -store-
  (let [instance (atom nil)
        threads  (atom {})
        
        ;; This is a single lock
        lock     (Object.)
        watch-fn  (fn [_ _ _ n]
                    (let [;; when watcher is triggered
                          ;; make note of time
                          start (System/nanoTime)
                          
                          ;; notifies all watchers on the lock
                          _ (locking lock
                              (.notifyAll lock))
                          
                          queues (mapv :queue (vals @threads))]
                      
                      ;; send start time to each thread
                      (doseq [q queues]
                        (cc/put q start))))
        _ (add-watch instance :primary watch-fn)]
    (TestStore. instance threads lock)))


;; This holds the queue (which gets sent the results)
(def -results- (atom (cc/queue)))

;; This registers a thread with the store and starts it
(defn register-handler
  [{:keys [instance lock threads] :as store} id f]
  (let [queue  (cc/queue)

        ;;Creates the thread function
        thread-fn (fn []
                    (loop []
                      ;; When it starts, the thread is locked
                      (locking lock (.wait lock))

                      ;; As soon as it get notify, it logs the end time
                      (let [end   (System/nanoTime)
                            start (cc/take queue)]

                        ;; And puts the results on the store
                        (cc/put @-results- {:start start
                                            :end end
                                            :total (- end start)}))
                      (recur)))

        ;; Creates the new Thread
        thread (Thread. thread-fn)

        ;; Registers with the store (so it can send start times)
        _ (swap! threads
                 assoc id {:queue queue
                           :thread thread})]

    ;; starts each thread
    (.start thread)
    store))

Подождите / уведомите ЭТАЛОН:

;; REGISTERS 100 THREADS to WAIT on a single lock
(dotimes [i 100]
  (register-handler -store- (str (rand)) (fn [m])))

(def ---wait-notify-benchmark---

  ;; Setup queue (to receive events)
  (let [_ (reset! -results- (cc/queue))]
    
    ;; Triggers the Notify Function 
    (reset! (:instance -store-) 1)

    ;; Wait for results
    (Thread/sleep 100)
    (cc/drain @-results-)))

1 Ответ

1 голос
/ 10 июля 2020

В моем тесте создание нового потока каждый раз вместо передачи элементов другому потоку через Queue и wait() / notify() на 2 порядка медленнее.

Либо ваш тест ошибочен или ваш сценарий более конкретен c, и в этом случае вы должны его описать.

Если вы хотите уменьшить задержку между публикацией элемента и его потреблением, вам потребуется занято-ожидание . wait() / notify() и блокирует вызов ядра, что влечет за собой некоторые накладные расходы. Вы можете использовать JCTools или Agrona.

class WaitNotify {    
    static final int iterations = 10_000_000;

    private static void benchmark() throws InterruptedException {
        Consumer consumer = new Consumer();
        consumer.start();
        long start = System.nanoTime();
        for (int i = 0; i < iterations; i++) {
            synchronized (consumer) {
                consumer.queue.add(new Object());
                consumer.notify();
            }
        }
        consumer.join();
        long elapsed = System.nanoTime() - start;
        System.out.println((double) elapsed / iterations);
    }
}

class Consumer extends Thread {
    final Queue<Object> queue = new ArrayDeque<>();

    @Override
    public synchronized void run() {
        try {
            for (int i = 0; i < WaitNotify.iterations; ) {
                wait();
                while (!queue.isEmpty()) {
                    queue.poll();
                    i++;
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
class NewThread {    
    private static final int iterations = 100_000;

    private static void benchmark() throws InterruptedException {
        long start = System.nanoTime();
        for (int i = 0; i < iterations; i++) {
            Thread consumer = new Consumer(new Object());
            consumer.start();
            consumer.join();
        }
        long elapsed = System.nanoTime() - start;
        System.out.println((double) elapsed / iterations);
    }
}

class Consumer extends Thread {
    Consumer(Object e) {}

    @Override
    public void run() {}
}
...