Синхронизированное использование буфера FIFO - PullRequest
1 голос
/ 02 октября 2011

Я пытаюсь создать систему, в которой один поток A добавляет элементы в буфер, затем другой поток B отвечает за чтение элементов в том порядке, в котором они были введены, а затем за выполнение некоторых потенциально длительных операций с ними.

Мое лучшее предположение:

 Class B extends Thread {

    Buffer fifo = BufferUtils.synchronizedBuffer(new BoundedFifoBuffer());

    add(Object o) { // Thread A calls me, and doesn't deal well with delays :)
      fifo.add(o); // will the sync below prevent this from happening? 
                   // or can .add be independent of the sync ?
    }

    run() {
     synchronized (fifo) { // why am i sync'd here?  I am the only thread accessing...
         while ( item in buffer ) { // also how do i check this, and block otherwise?
            process(fifo.remove());
         }
     }
    |
  }

Как видите, я даже не совсем уверен, нужна ли синхронизация. Проблема безопасности потока, которая у меня есть, не имеет ничего общего с доступом get (), так как к нему будет иметь доступ только один поток, но что важно most , так это то, что поток A вызывает метод .add () без параллелизма Доступ к Exception во время обработки потоком содержимого буфера.

Может, я обдумываю это? Безопасно ли быть с? Ваша оценка этой проблемы очень ценится.

С уважением,

Jay

Ответы [ 2 ]

2 голосов
/ 02 октября 2011

Если я не ошибаюсь, вас также может заинтересовать этот ArrayBlockingQueue класс.

1 голос
/ 02 октября 2011

Если у вас есть поток символов для регистрации, самый быстрый подход может заключаться в использовании канала.

    PipedOutputStream pos = new PipedOutputStream();
    final PipedInputStream pis = new PipedInputStream(pos, 256*1024);
    ExecutorService es = Executors.newSingleThreadExecutor();
    es.execute(new Runnable() {
        @Override
        public void run() {
            byte[] bytes = new byte[256*1024];
            int length;
            try {
                while ((length = pis.read(bytes)) > 0) {
                    // something slow.
                    Thread.sleep(1);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    });

    // time latency
    PrintWriter pw = new PrintWriter(pos);
    long start = System.nanoTime();
    int runs = 10*1000*1000;
    for(int i=0;i<runs;i++) {
        pw.println("Hello "+i);
    }
    long time = System.nanoTime() - start;
    System.out.printf("Took an average of %,d nano-seconds per line%n", time/runs);
    es.shutdown();

печать

    Took an average of 269 nano-seconds per line

Примечание: сама труба не создает мусора. (В отличие от очереди)


Вы можете использовать ExecutorService, чтобы обернуть очередь и поток (ы)

ExecutorService es =

es.submit(new Runnable() {
  public void run() {
     process(o);
  }
});
...