Java делятся данными между потоками - PullRequest
4 голосов
/ 04 марта 2011

У меня есть процесс Java, который читает данные с сервера сокетов.Таким образом, у меня есть объект BufferedReader и PrintWriter, соответствующий этому сокету.

Теперь в том же процессе Java у меня есть многопоточный сервер Java, который принимает клиентские подключения.Я хочу добиться функциональности, при которой все эти клиенты, которых я принимаю, могут читать данные из объекта BufferedReader, о котором я упоминал выше (чтобы они могли мультиплексировать данные)

Как мне сделать этоотдельные клиентские потоки считывают данные из одного объекта BuffereReader? Извините за путаницу.

Ответы [ 3 ]

6 голосов
/ 04 марта 2011

Я бы настоятельно рекомендовал, чтобы они не обращались к BufferedReader напрямую. Предполагая, что вы знаете формат данных, и что каждый клиент пытается прочитать один и тот же формат (если это не так, я не вижу, как это вообще может работать), я бы предложил создать один поток для чтения из BufferedReader и положить рабочие элементы в Queue. Есть лотов примеров использования очередей производителей / потребителей в Java, и это также может упростить тестирование клиентского кода.

Наличие только одного потока, обращающегося к BufferedReader, означает, что вам не нужно беспокоиться об определении атомарной операции, для которой все остальные потоки должны будут удовлетворять - ваш поток чтения эффективно определяет эту операцию, решив добавить рабочий элемент в очередь.

РЕДАКТИРОВАТЬ: Если все клиенты должны видеть все данных, это подтверждает мое предположение о том, чтобы иметь еще одного считывателя еще дальше - за исключением того, что вместо Queue данных, из которых удаляются элементы, у вас будет коллекция, из которой клиенты могут читать все существующие данные. Вам нужно будет использовать соответствующую потокобезопасную коллекцию, но в Java их много.

РЕДАКТИРОВАТЬ: Просто прочитав ваш комментарий, который говорит, что каждый клиент должен просто увидеть последний элемент, прочитанный из читателя, что делает его еще проще. Пусть один поток читает данные, но просто хранит одну переменную со ссылкой на «последний прочитанный элемент». Вы, вероятно, хотите либо синхронизировать доступ к нему, либо использовать AtomicReference, но оба эти способа просты.

1 голос
/ 04 марта 2011

Если вы не можете хранить все данные в памяти, вы можете использовать что-то вроде этого: (Использует классический шаблон наблюдателя, когда один поток отслеживает источник, а другие уведомляются о новых данных)

class Broadcaster {
    private final ConcurrentSkipListSet<Listener> listeners =
         new ConcurrentSkipListSet<Listener>();

    private final BufferedReader source = //inject source

    // register / de-gegister code
    public void register(Listener listener);
    public void deRegister(Listener listener);
    // usually it's used like broadcaster.register(this);

    public void broadcast(){
        String read = null;
        while((read = source.readLine())!=null){
            for(Listener listener : listeners){
                listener.send(read);
            }
        }
    }
 }

и

class Listener implements Runnable {
    private final Queue<String> queue = new LinkedBlockingQueue<String>();

    public void send(String read){
        queue.add(read);
    }

    public void run(){
        while(!Thread.currentThread().isInterrupted()){
            String got = queue.get();
            System.out.println(got);
        }
    }
}

Я не проверял эту вещь или что-либо еще, поэтому помилуйте, если она не работает!

РЕДАКТИРОВАТЬ : Если выЕсли у вас есть ограничение памяти, вы можете захотеть сделать это также:

Queue<String> queue = new LinkedBlockingQueue<String>(2000);

Это установит верхний предел на количество элементов, которые могут стоять в очереди в очереди, и когда этот предел будет достигнут, потоки, которые хотятположить элементы на это придется ждать (если используется add, то есть).Таким образом, Broadcaster будет блокировать (ждать), когда слушатели не смогут использовать данные достаточно быстро (в то время как в этой простой схеме другой слушатель может голодать).

РЕДАКТИРОВАТЬ 2: Когда вы делаете это, вы должны ставить в очередь только неизменные вещи.Если вы поставите, например, byte[], невежливый слушатель может изменить данные, а другие потоки могут быть удивлены.Если это как-то невозможно, вы должны поместить разные копии в каждую очередь, например, byteArray.clone().Это может привести к некоторому снижению производительности.

1 голос
/ 04 марта 2011

Возможно, проще всего создать временный файл и использовать java.nio.channels.FileChannel.Причина в том, что это уже обеспечивает семантику чтения / записи, которую вы хотите.Альтернативой является повторная реализация этой семантики самостоятельно с помощью схемы в памяти.

Что бы вы сделали, это прочитали из вашего считывателя и добавили в конец файла.Тогда ваши читатели получат файловые каналы, изначально указывающие на конец файла.Вы должны быть осторожны, чтобы убедиться, что они не думают, что достигли конца потока, когда вы просто ожидаете большего ввода от читателя.Вы можете либо сделать так, чтобы нижестоящая логика знала об этом, либо предоставить какую-то оболочку для читателей.

Редактировать: это было написано с пониманием, что вы хотите, чтобы все читатели видели все.Это слишком сложно, если на самом деле это не то, что вы хотите.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...