Моя попытка реализовать BlockingQueue, содержащую только уникальные элементы - PullRequest
0 голосов
/ 07 марта 2020

Моя проблема очень похожа на: Java очередь блокировки, содержащая только уникальные элементы

Но мне нужно всего 2 метода: put и take из BlockingQueue Интерфейс , Ответы, приведенные в этом топи c, на мой взгляд, не очень хороши. Первый из них работает на 2 коллекциях, а второй слишком большой и не проверен вообще.

Это моя попытка:

public class ConcurrentBufferSet<T> implements ConcurrentBuffer<T> {

    private Set<T> set = new LinkedHashSet<>();

    @Override
    public void put(T t) throws InterruptedException {
        synchronized (this) {
            set.add(t);
            notify();
        }
    }

    @Override
    public T take () throws InterruptedException {
        synchronized (this) {
            while (set.isEmpty()) {
                wait();
            }
            Iterator<T> it = set.iterator();
            T t = it.next();
            it.remove();
            notify();
            return t;
        }
    }
}

, и это тест, который время от времени терпит неудачу из-за тайм-аута:

    @Test(timeout = 300)
    public void testManyThreadsOnSingleBuffer() throws InterruptedException {

        // given:
        AtomicInteger putCount = new AtomicInteger();
        AtomicInteger takenCount = new AtomicInteger();
        ConcurrentBuffer<Integer> buffer = new ConcurrentBufferSet<>();

        int[] producerIterations = {54, 93, 1, 44, 20};
        int[] consumerIterations = {90, 99, 10, 2, 11};
        int producerIterationsSum = Arrays.stream(producerIterations).sum();
        int consumerIterationsSum = Arrays.stream(consumerIterations).sum();

        // when:
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for(int i = 0; i < producerIterations.length; i ++) {
            exec.execute(createProducerRunnable(buffer, putCount, producerIterations[i]));
            exec.execute(createConsumerRunnable(buffer, takenCount, consumerIterations[i]));
        }
        exec.shutdown();
        exec.awaitTermination(200, TimeUnit.MILLISECONDS);

        // then:
        assertEquals(producerIterationsSum, consumerIterationsSum);
        assertEquals(putCount.get(), takenCount.get());
        assertEquals(producerIterationsSum, putCount.get());
        assertEquals(consumerIterationsSum, takenCount.get());
        assertEquals(producerIterationsSum - consumerIterationsSum, buffer.size());
    }

Когда я заменяю свою реализацию на оригинальный LinkedBlockingQueue, то тесты всегда работают хорошо. Что не так с моей реализацией?

...