Я думаю, что это на самом деле довольно сложно.Я не знаю, как написать правильную версию, используя подход, основанный на множестве.Например, следующий подход неверен:
public class ThreadsStopWorkingWrong {
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
ConcurrentHashMap activeThreads = new ConcurrentHashMap();
volatile int prozessedCount = 0;
volatile boolean stop = false;
@Interleave(group = ThreadsStopWorkingWrong.class, threadCount = 1)
public void readFromQueue() {
int prozessAdditionalElements = 1;
while (!stop) {
Object element = queue.poll();
if (element != null) {
activeThreads.put(Thread.currentThread(), "");
if (prozessAdditionalElements > 0) {
prozessAdditionalElements--;
queue.offer("2");
}
prozessedCount++;
} else {
activeThreads.remove(Thread.currentThread());
}
}
}
@Interleave(group = ThreadsStopWorkingWrong.class, threadCount = 1)
public void waitTillProzessed() throws InterruptedException {
while (!queue.isEmpty() && !activeThreads.isEmpty()) {
Thread.sleep(1);
}
assertEquals(2, prozessedCount);
}
@Test
public void test() throws InterruptedException {
queue.offer("1");
Thread worker = new Thread(() -> readFromQueue());
worker.start();
waitTillProzessed();
worker.join();
}
}
Проблема в том, что при опросе сообщения из очереди вы еще не добавили поток в активированный набор, так что! Queue.isEmpty () &&! activeThreads.isEmpty () становится истинным.Что работает, так это использование счетчика сообщений, как в следующем примере:
public class ThreadsStopWorkingCorrect {
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
AtomicLong messageCount = new AtomicLong();
volatile int prozessedCount = 0;
volatile boolean stop = false;
@Interleave(group = ThreadsStopWorkingCorrect.class, threadCount = 1)
public void readFromQueue() {
int prozessAdditionalElements = 1;
while (!stop) {
Object element = queue.poll();
if (element != null) {
if (prozessAdditionalElements > 0) {
prozessAdditionalElements--;
queue.offer("2");
messageCount.incrementAndGet();
}
prozessedCount++;
messageCount.decrementAndGet();
}
}
}
@Interleave(group = ThreadsStopWorkingCorrect.class, threadCount = 1)
public void waitTillProzessed() throws InterruptedException {
while (messageCount.get() > 0) {
Thread.sleep(1);
}
assertEquals(2, prozessedCount);
}
@Test
public void test() throws InterruptedException {
queue.offer("1");
messageCount.incrementAndGet();
Thread worker = new Thread(() -> readFromQueue());
worker.start();
waitTillProzessed();
worker.join();
}
}
Я протестировал оба примера с vmlens , инструментом, который я написал для тестирования многопоточного программного обеспечения.Поэтому чередование аннотаций.
В версии на основе набора некоторые чередования потоков приводят к prozessedCount == 0.В счетной версии prozessedCount всегда равен 2.