Как создать очередь Consumer Producer - PullRequest
1 голос
/ 28 марта 2019

У меня есть производитель, который создает POJO со свойством, типа. Может быть только два типа, «A» и «B». У меня есть пул потоков для потребителей. Всякий раз, когда я получаю сообщение типа «B» от источника, прежде чем я смогу приступить к выполнению, я должен убедиться, что все другие потоки в пуле завершили выполнение (на данный момент это Thread.sleep по умолчанию). Затем потребительский поток должен получить сообщение типа «B» и запустить его. Пока этот поток не запущен, никакое сообщение не может быть извлечено из очереди.

Пример:

class POJO_Message{

String type; //This will contain the type of message "A" or "B"

}

Ответы [ 2 ]

0 голосов
/ 28 марта 2019

Вы можете использовать ReadWriteLock , чтобы работать. Когда тип сообщения «B», попробуйте получить блокировку записи , сообщение другого типа получает блокировку чтения. Один простой код, подобный этому.

public class ConsumerProducerQueue {
    ExecutorService executor = Executors.newFixedThreadPool(10);
    private ReadWriteLock lock = new ReentrantReadWriteLock();

    public void addMessage(Message message) {
        if ("B".equals(message.getType())) {
            lock.writeLock().lock();
            Future<?> result = executor.submit(new Task(message));
            try {
                result.get();
            } catch (Exception e) {

            } finally {
                lock.writeLock().unlock();
            }
        } else {
            lock.readLock().lock();
            Future<?> result = executor.submit(new Task(message));
            try {
                result.get();
            } catch (Exception e) {

            } finally {
                lock.readLock().unlock();
            }
        }
    }
}

производительность этого метода не очень хорошая.

0 голосов
/ 28 марта 2019

Вы можете использовать LinkedBlockingDeque .Пример:

public class ProducerConsumer {

    public static void main(String[] args) {

        final LinkedBlockingDeque<Message> queue = new LinkedBlockingDeque<>(10);

        final AtomicLong id = new AtomicLong(0);
        final Timer producer = new Timer(true);
        producer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
               queue.add(new Message(  String.format("msg: %s"  , id.incrementAndGet() ) ) );
            }
        }, 10, 10);

        // consume
        for(;;) {
            try {
                Message msg  = queue.take();
                System.out.println( msg );
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

    }

    private static class Message {
        private final String id;

        public Message(String id) {
            this.id = id;
        }

        public String getId() {
            return id;
        }

        @Override
        public String toString() {
            return String.format("Message [id=%s]", id);
        }

    }

}
...