У меня есть система, которая читает имена из списка, вызывает внешний сервер для проверки истинного / ложного статуса и выполняет действия с истинным статусом. вызов на внешний сервер занимает некоторое время, поэтому запуск всего этого в одном потоке не очень эффективен.
В настоящее время я пытаюсь реализовать ее как систему «производитель / потребитель», в которой многие потребительские потоки читают имена из списка, вызывают внешний сервер, помещают действительные имена в очередь блокировки и имеют одного потребителя, выбирающего элементы из очереди. и действовать им. к сожалению, однако, система иногда будет работать до конца, а иногда зависать до бесконечности.
Тестовый код выглядит следующим образом
public class SubscriberTest {
static Queue<String> subscribed = new ConcurrentLinkedQueue<String>();
static BlockingQueue<String> valid = new LinkedBlockingQueue<String>(100);
Random rand = new Random();
public SubscriberTest(int i) {
for (int j = 0; j < i; j++) {
subscribed.add("I love:" + j);
}
}
public SubscriberTest(Queue<String> subs) {
subscribed = subs;
}
public static void main(String[] args) {
SubscriberTest fun = new SubscriberTest(10000);
System.out.println(subscribed.size());
ExecutorService producers = Executors.newCachedThreadPool();
ExecutorService consumers = Executors.newSingleThreadExecutor();
Consumer consumer = fun.new Consumer();
Producer producer = fun.new Producer();
while (!subscribed.isEmpty()) {
producers.execute(producer);
consumers.execute(consumer);
}
producers.shutdown();
consumers.shutdown();
System.out.println("finally");
}
// take names from subscribed and get status
class Producer implements Runnable {
public void run() {
String x = subscribed.poll();
System.out.println("Producer: " + x + " " + Thread.currentThread().getName());
try {
if (getStatus(x)) {
valid.put(x);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// this is a call to an external server
private boolean getStatus(String x) {
return rand.nextBoolean();
}
}
// takes names from valid queue and save them
class Consumer implements Runnable {
public void run() {
try {
System.out.println("Consumer: " + valid.take() + " " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Пожалуйста, покажи мне, где я ошибаюсь.