Эффективные потоки потребителей / производителей для тестирования Wait / Notify или CountDownLatch или других - PullRequest
0 голосов
/ 26 марта 2019

У меня есть несколько модульных тестов, которые запускают некоторые вычисления в другом потоке. В основной ветке я хотел бы дождаться результатов, которые будут готовы, а затем проверить их с некоторыми утверждениями.

Несколько способов сделать это включают wait() / notify(), CountDownLatch(), замки и, возможно, другие. Какие критерии использовать при выборе реализации? С точки зрения производительности CountDownLatch кажется самым быстрым, поэтому я бы выбрал его в качестве «по умолчанию».

package org.anisotrop.jmh;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static org.junit.Assert.assertTrue;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, timeUnit = TimeUnit.NANOSECONDS)
@Measurement(iterations = 10, timeUnit = TimeUnit.NANOSECONDS)
@Fork(value = 3, warmups = 1)
@State(Scope.Benchmark)
public class CountDownLatchVsWaitNotify {

    private static class WaitLock {
        boolean b;
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(CountDownLatchVsWaitNotify.class.getName())
                .build();
        new Runner(opt).run();
    }

    @Benchmark
    public void countDownLatch() throws InterruptedException {
        final int[] val = new int[1];
        final CountDownLatch latch = new CountDownLatch(1);
        Runnable producer = new Runnable() {
            public void run() {
                val[0] = 100;
                latch.countDown();
            }
        };
        Thread p = new Thread(producer);
        p.start();

        latch.await(5, TimeUnit.SECONDS);
        assertTrue(100 == val[0]);
        return;
    }

    @Benchmark
    public void waitNotify() throws InterruptedException {
        final int[] val = new int[1];
        final WaitLock waitLock = new WaitLock();
        waitLock.b = false;
        Runnable producer = new Runnable() {
            public void run() {
                synchronized (waitLock) {
                    val[0] = 100;
                    waitLock.b = true;
                    waitLock.notify();
                }
            }
        };
        Thread p = new Thread(producer);
        p.start();
        synchronized (waitLock) {
            while (!waitLock.b) {
                waitLock.wait();
            }
        }
        assertTrue(100 == val[0]);
        return;
    }

    @Benchmark
    public void reentrantLock() throws InterruptedException {
        final int[] val = new int[1];
        final Lock lock = new ReentrantLock();
        final Condition produced = lock.newCondition();

        Runnable producer = new Runnable() {
            public void run() {
                lock.lock();
                try {
                    val[0] = 100;
                    produced.signal();
                } finally {
                    lock.unlock();
                }
            }
        };
        Thread p = new Thread(producer);
        p.start();

        lock.lock();
        try {
            while (val[0] == 0) {
                produced.await();
            }
        } finally {
            lock.unlock();
        }
        assertTrue(100 == val[0]);
        return;
    }

    @Benchmark
    public void blockingQueue() throws InterruptedException {
        final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue(1);
        Runnable producer = new Runnable() {
            public void run() {
                try {
                    queue.put(Integer.valueOf(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Thread p = new Thread(producer);
        p.start();
        Integer val = queue.poll(5, TimeUnit.SECONDS);
        assertTrue(100 == val);
        return;
    }

    @Benchmark
    public void threadJoin() throws InterruptedException {
        final int[] val = new int[1];
        Runnable producer = new Runnable() {
            public void run() {
                val[0] = 100;
            }
        };
        Thread p = new Thread(producer);
        p.start();
        p.join();
        assertTrue(100 == val[0]);
        return;
    }
}

Тесты на моей машине:

# Run complete. Total time: 00:06:16

Benchmark                                          Mode  Samples      Score  Score error  Units
o.a.j.CountDownLatchVsWaitNotify.blockingQueue     avgt       30  69841.054    17032.222  ns/op
o.a.j.CountDownLatchVsWaitNotify.countDownLatch    avgt       30  58338.648     6457.794  ns/op
o.a.j.CountDownLatchVsWaitNotify.reentrantLock     avgt       30  90087.940    34442.019  ns/op
o.a.j.CountDownLatchVsWaitNotify.threadJoin        avgt       30  68214.133    23156.421  ns/op
o.a.j.CountDownLatchVsWaitNotify.waitNotify        avgt       30  69560.902    12001.154  ns/op
...