Вам нужно немного взвесить, если у вас много конечных машин и случайных сообщений каждому или несколько конечных машин и частых сообщений каждому.
Если у вас много конечных машин, то буквально наличие одного потока на конечную машину звучит немного громко, если вы не собираетесь постоянно передавать сообщения на все эти машины. Я бы предложил иметь пул потоков, которые будут расти только между определенными границами. Для этого вы можете использовать ThreadPoolExecutor. Когда вам нужно опубликовать сообщение, вы фактически отправляете исполняемый файл исполнителю, который отправит сообщение:
Executor msgExec = new ThreadPoolExecutor(...);
public void sendMessage(final String machineId, byte[] message) {
msgExec.execute(new Runnable() {
public void run() {
sendMessageNow(machineId, message);
}
});
}
private void sendMessageNow(String machineId, byte[] message) {
// open connection to machine and send message, thinking
// about the case of two simultaneous messages to a machine,
// and whether you want to cache connections.
}
Если у вас есть только несколько конечных компьютеров, то у вас может быть BlockingQueue на машину и поток на очередь блокировки, ожидающий следующего сообщения. В этом случае шаблон больше похож на этот (остерегайтесь непроверенного кода в воскресенье утром):
ConcurrentHashMap<String,BockingQueue> queuePerMachine;
public void sendMessage(String machineId, byte[] message) {
BockingQueue<Message> q = queuePerMachine.get(machineId);
if (q == null) {
q = new BockingQueue<Message>();
BockingQueue<Message> prev = queuePerMachine.putIfAbsent(machineId, q);
if (prev != null) {
q = prev;
} else {
(new QueueProessor(q)).start();
}
}
q.put(new Message(message));
}
private class QueueProessor extends Thread {
private final BockingQueue<Message> q;
QueueProessor(BockingQueue<Message> q) {
this.q = q;
}
public void run() {
Socket s = null;
for (;;) {
boolean needTimeOut = (s != null);
Message m = needTimeOut ?
q.poll(60000, TimeUnit.MILLISECOND) :
q.take();
if (m == null) {
if (s != null)
// close s and null
} else {
if (s == null) {
// open s
}
// send message down s
}
}
// add appropriate error handling and finally
}
}
В этом случае мы закрываем соединение, если в течение 60 секунд не пришло сообщение для этого устройства.
Стоит ли вместо этого использовать JMS? Ну, вы должны взвесить, кажется ли вам это сложным. Мое личное ощущение, что это не достаточно сложная задача, чтобы оправдать специальные рамки. Но я уверен, что мнения расходятся.
P.S. В действительности, теперь я смотрю на это, вы, вероятно, поместите очередь в объект потока и просто отобразите ID машины -> объект потока. В любом случае, вы поняли идею.