Моя проблема очень похожа на: Java очередь блокировки, содержащая только уникальные элементы
Но мне нужно всего 2 метода: put
и take
из BlockingQueue
Интерфейс , Ответы, приведенные в этом топи c, на мой взгляд, не очень хороши. Первый из них работает на 2 коллекциях, а второй слишком большой и не проверен вообще.
Это моя попытка:
public class ConcurrentBufferSet<T> implements ConcurrentBuffer<T> {
private Set<T> set = new LinkedHashSet<>();
@Override
public void put(T t) throws InterruptedException {
synchronized (this) {
set.add(t);
notify();
}
}
@Override
public T take () throws InterruptedException {
synchronized (this) {
while (set.isEmpty()) {
wait();
}
Iterator<T> it = set.iterator();
T t = it.next();
it.remove();
notify();
return t;
}
}
}
, и это тест, который время от времени терпит неудачу из-за тайм-аута:
@Test(timeout = 300)
public void testManyThreadsOnSingleBuffer() throws InterruptedException {
// given:
AtomicInteger putCount = new AtomicInteger();
AtomicInteger takenCount = new AtomicInteger();
ConcurrentBuffer<Integer> buffer = new ConcurrentBufferSet<>();
int[] producerIterations = {54, 93, 1, 44, 20};
int[] consumerIterations = {90, 99, 10, 2, 11};
int producerIterationsSum = Arrays.stream(producerIterations).sum();
int consumerIterationsSum = Arrays.stream(consumerIterations).sum();
// when:
ExecutorService exec = Executors.newFixedThreadPool(10);
for(int i = 0; i < producerIterations.length; i ++) {
exec.execute(createProducerRunnable(buffer, putCount, producerIterations[i]));
exec.execute(createConsumerRunnable(buffer, takenCount, consumerIterations[i]));
}
exec.shutdown();
exec.awaitTermination(200, TimeUnit.MILLISECONDS);
// then:
assertEquals(producerIterationsSum, consumerIterationsSum);
assertEquals(putCount.get(), takenCount.get());
assertEquals(producerIterationsSum, putCount.get());
assertEquals(consumerIterationsSum, takenCount.get());
assertEquals(producerIterationsSum - consumerIterationsSum, buffer.size());
}
Когда я заменяю свою реализацию на оригинальный LinkedBlockingQueue, то тесты всегда работают хорошо. Что не так с моей реализацией?