Производитель, использующий очередь блокировки - PullRequest
0 голосов
/ 02 марта 2012

У меня есть реализация Producer / Consumer, использующая блокировку, я запускаю их вот так

BlockingQueue<Object> myQueue1 = new LinkedBlockingQueue<Object>();
new Thread(new SmsInProducer(myQueue1, 100)).start();

for (int i = 0; i < 100; i++ ) {
    new Thread(new SmsInConsumer(myQueue1)).start();
}

Внутри продюсер выглядит так

public class SmsInProducer implements Runnable {
    protected BlockingQueue queue;
    protected int MAX_RECORDS = 5000;

    @SuppressWarnings("rawtypes")
    public SmsInProducer(BlockingQueue theQueue, int maxRecord) {
        this.queue = theQueue;
        this.MAX_RECORDS = maxRecord;
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    @Override
    public void run() {
        // TODO Auto-generated method stub
    while (true) {
            int totalRecords = MAX_RECORDS - queue.size();
            if (totalRecords > 0) {

            List<Map> tList = Sql.updateTRECEIVE();
            if (tList != null && !tList.isEmpty()) {
                queue.addAll(tList);
            }
        }
        // wait for 1 second
    try { Thread.sleep(Parameter.replyWaitTime * 1000); }   catch(Exception e) {}
    }
}

и потребитель выглядит так

public class SmsInConsumer implements Runnable {
@SuppressWarnings("rawtypes")
protected BlockingQueue queue;

@SuppressWarnings("rawtypes")
public SmsInConsumer(BlockingQueue theQueue) {
        this.queue = theQueue;
    }

@SuppressWarnings("rawtypes")
@Override
public void run() {
        // TODO Auto-generated method stub
        while (true) {
        try {
        Object obj = queue.take();
                Map map = (Map) obj;

        } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try { Thread.sleep(Parameter.replyWaitTime * 1000); } catch(Exception e){}
        }
     }
}

Но через какое-то время это становится очень медленным, могу ли я в любом случае поддерживать его очень быстрым?

Ответы [ 2 ]

0 голосов
/ 25 февраля 2017
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
/**
 *
 * @author sakshi
 */
public class BlockingqueueDemo {

    static BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();

    static class Producer extends Thread {

        BlockingQueue queue;

        public Producer(BlockingQueue<Integer> queue) {
            this.queue = queue;

        }

        public void run() {
            for (int i = 0; i < 10; i++) {
                System.out.println("produce" + i);
                try {
                    queue.put(i);
                    Thread.sleep(2000);

               } catch (InterruptedException ex) {
                    Logger.getLogger(BlockingqueueDemo.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }

    }

    static class Consumer extends Thread {

        BlockingQueue queue;

        public Consumer(BlockingQueue<Integer> queue) {
            this.queue = queue;

        }

        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    System.out.println("consume" + queue.take());

                } catch (InterruptedException ex) {
                    Logger.getLogger(BlockingqueueDemo.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }
    }

    public static void main(String[] args) {
        Producer produce = new Producer(queue);
        Consumer consume = new Consumer(queue);
        produce.start();
        consume.start();
    }

}

output:

produce0
consume0
produce1
consume1
produce2
consume2
produce3
consume3
produce4
consume4
produce5
consume5
produce6
consume6
produce7
consume7
produce8
consume8
produce9
consume9
0 голосов
/ 02 марта 2012

Удалите спящий режим в Consumer, вызов queue.take () заблокирует поток, пока объект не станет доступным, поэтому нет необходимости спать.

Но из-за того, что он запускается быстро и со временем замедляется, попробуйте оптимизировать вашего продюсера.Попробуйте использовать больше элементов в очереди, оптимизируйте Sql.updateTRECEIVE ();и сними сон.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...