Если вы не можете хранить все данные в памяти, вы можете использовать что-то вроде этого: (Использует классический шаблон наблюдателя, когда один поток отслеживает источник, а другие уведомляются о новых данных)
class Broadcaster {
private final ConcurrentSkipListSet<Listener> listeners =
new ConcurrentSkipListSet<Listener>();
private final BufferedReader source = //inject source
// register / de-gegister code
public void register(Listener listener);
public void deRegister(Listener listener);
// usually it's used like broadcaster.register(this);
public void broadcast(){
String read = null;
while((read = source.readLine())!=null){
for(Listener listener : listeners){
listener.send(read);
}
}
}
}
и
class Listener implements Runnable {
private final Queue<String> queue = new LinkedBlockingQueue<String>();
public void send(String read){
queue.add(read);
}
public void run(){
while(!Thread.currentThread().isInterrupted()){
String got = queue.get();
System.out.println(got);
}
}
}
Я не проверял эту вещь или что-либо еще, поэтому помилуйте, если она не работает!
РЕДАКТИРОВАТЬ : Если выЕсли у вас есть ограничение памяти, вы можете захотеть сделать это также:
Queue<String> queue = new LinkedBlockingQueue<String>(2000);
Это установит верхний предел на количество элементов, которые могут стоять в очереди в очереди, и когда этот предел будет достигнут, потоки, которые хотятположить элементы на это придется ждать (если используется add
, то есть).Таким образом, Broadcaster будет блокировать (ждать), когда слушатели не смогут использовать данные достаточно быстро (в то время как в этой простой схеме другой слушатель может голодать).
РЕДАКТИРОВАТЬ 2: Когда вы делаете это, вы должны ставить в очередь только неизменные вещи.Если вы поставите, например, byte[]
, невежливый слушатель может изменить данные, а другие потоки могут быть удивлены.Если это как-то невозможно, вы должны поместить разные копии в каждую очередь, например, byteArray.clone()
.Это может привести к некоторому снижению производительности.