Два BlockingQueue в одном бесконечном цикле? - PullRequest
0 голосов
/ 03 апреля 2010

У меня есть поток, который обрабатывает поступающие сообщения (бесконечный цикл). Для этого я использую BlockingQueue (Java), который работает довольно неплохо. Теперь я хочу добавить второй процессор в том же методе класса oder. Проблема сейчас в том, что в бесконечном цикле у меня есть эта часть

newIncomming = this.incommingProcessing.take();

Эта часть блокируется, если очередь пуста. Я ищу решение для обработки очередей в одном классе. Вторая очередь может быть обработана, только некоторые данные поступают в первую очередь. Есть ли способ обрабатывать очереди блокировки буксировки в одном и том же бесконечном цикле?

Ответы [ 4 ]

1 голос
/ 03 апреля 2010

Либо вам нужны два потока, либо они нужны вам, чтобы разделить одну и ту же очередь блокировки. (Или вам нужно использовать структуру, отличную от очереди блокировки)

0 голосов
/ 03 апреля 2010

Что я понимаю из вашего вопроса, я придумал следующее

Фрагмент кода

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */

package blockingqueues;

import java.io.BufferedReader;
import java.io.Console;
import java.io.InputStreamReader;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 *
 * @author alfred
 */
public class BlockingQueues {
    private static String take(BlockingQueue<String> bq) {
        try {
            return bq.take();
        } catch(InterruptedException ie) {
            return null;
        }
    }

    public static void main(String args[]) throws Exception {
        final BlockingQueue<String> b1 = new LinkedBlockingQueue<String>();
        final BlockingQueue<String> b2 = new LinkedBlockingQueue<String>();
        ExecutorService es = Executors.newCachedThreadPool();
        es.execute(new Runnable() {
            public void run() {
                while (true) {
                    String results = take(b1);
                    if (results == null) {
                        break;
                    }
                    System.out.println("first: " + results);
                }
            }
        });
        es.execute(new Runnable() {
            public void run() {
                while (true) {
                    String results = take(b2);
                    if (results == null) {
                        break;
                    }
                    System.out.println("second: " + results);
                }
            }
        });
        BufferedReader br = new BufferedReader(
                new InputStreamReader(System.in)
                );
        String input = null;
        System.out.println("type x to quit.");
        while (!"x".equals(input)) {
            input = br.readLine();
            if (input.startsWith("1 ")) {
                // Add something to first blocking queue.
                b1.add(input.substring(2));   
            } else if (input.startsWith("2 ")) {
                // Add something to second blocking queue.
                b2.add(input.substring(2));
            }
        }
        es.shutdownNow();
        es.awaitTermination(10, TimeUnit.SECONDS);
        System.out.println("bye!");
        br.close();
    }
}

Выполнение программы:

Вы можете ввести текст из консоли, чтобы добавить задачу в очередь блокировки b1 или b2. Если ваша консоль input начинается с 1, например, input = "1 hello", тогда b1 будет обрабатывать задачу (печатать first: hello), в противном случае, если ввод начинается с 2, например, input = "world", тогда b2 печать second: world.

0 голосов
/ 03 апреля 2010

Я не уверен, что вы пытаетесь сделать, но если вы не хотите, чтобы поток блокировал очередь, если она пуста, вы можете использовать BlockingQueue.peek () , чтобы сначала проверить, очередь пуста или нет.

0 голосов
/ 03 апреля 2010

BlockingQueue предназначен для многопоточных реализаций. Вместо этого используйте простую очередь. Смотрите это .

...