Обмен сообщениями - абстрактное понятие. Сообщение можно рассматривать как часть данных, которая должна быть отправлена, передана, получена и обработана каким-либо образом. Он не привязан к конкретному языку программирования, фреймворку или библиотеке. Помимо Spring, сообщения Websocket, JMS и т. Д. c повсеместно распространены на низком уровне, таком как Win32 API, D-Bus, сети.
Что касается вашей задачи, то сообщение может быть просто строкой. Нет необходимости представлять его как JSON / XML / et c, поскольку оба проигрывателя работают в одном процессе.
Исходное описание задачи довольно широкое. С моей точки зрения, вам нужна не рекурсия , а своего рода маятник . Первый игрок отправляет сообщение инициализации и ждет ответа. Тем временем второй игрок получает сообщение инициализации, добавляет свой счетчик к сообщению и отвечает. Затем первый игрок просыпается после получения ответа, добавляет свой счетчик и отправляет обратно. Этот обмен повторяется снова и снова. Кроме того, каждый игрок не должен отправлять следующее сообщение до того, как получит ответ на предыдущее сообщение.
Очереди блокировки - лучший подход в чистом виде Java в рамках одного процесса.
Рассмотрим следующий код:
import java.math.BigInteger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class MessageTask
{
// Blocking queue looks superfluous for single message. But such a queue saves us from cumbersome
// synchronization of the threads.
private static final int MAX_MESSAGES_IN_QUEUE = 1;
public static void main(String[] args)
{
BlockingQueue<String> firstToSecond = new ArrayBlockingQueue<String>(MAX_MESSAGES_IN_QUEUE);
BlockingQueue<String> secondToFirst = new ArrayBlockingQueue<String>(MAX_MESSAGES_IN_QUEUE);
// Both players use the same queues symmetrically.
InitiatorPlayer firstPlayer = new InitiatorPlayer(firstToSecond, secondToFirst);
Player secondPlayer = new Player(secondToFirst, firstToSecond);
// Please note that we can start threads in reverse order. But thankfully to
// blocking queues the second player will wait for initialization message from
// the first player.
new Thread(secondPlayer).start();
new Thread(firstPlayer).start();
}
}
class Player implements Runnable
{
protected final BlockingQueue<String> sent;
protected final BlockingQueue<String> received;
// Please aware that integer field may overflow during prolonged run
// of the program. So after 2147483647 we'll get -2147483648. We can
// either use BigInteger or compare the field with Integer.MAX_VALUE
// before each increment.
//
// Let's choose BigInteger for simplicity.
private BigInteger numberOfMessagesSent = new BigInteger("0");
public Player(BlockingQueue<String> sent, BlockingQueue<String> received)
{
this.sent = sent;
this.received = received;
}
@Override
public void run()
{
while (true)
{
String receivedMessage = receive();
reply(receivedMessage);
}
}
protected String receive()
{
String receivedMessage;
try
{
// Take message from the queue if available or wait otherwise.
receivedMessage = received.take();
}
catch (InterruptedException interrupted)
{
String error = String.format(
"Player [%s] failed to receive message on iteration [%d].",
this, numberOfMessagesSent);
throw new IllegalStateException(error, interrupted);
}
return receivedMessage;
}
protected void reply(String receivedMessage)
{
String reply = receivedMessage + " " + numberOfMessagesSent;
try
{
// Send message if the queue is not full or wait until one message
// can fit.
sent.put(reply);
System.out.printf("Player [%s] sent message [%s].%n", this, reply);
numberOfMessagesSent = numberOfMessagesSent.add(BigInteger.ONE);
// All players will work fine without this delay. It placed here just
// for slowing the console output down.
Thread.sleep(1000);
}
catch (InterruptedException interrupted)
{
String error = String.format(
"Player [%s] failed to send message [%s] on iteration [%d].",
this, reply, numberOfMessagesSent);
throw new IllegalStateException(error);
}
}
}
class InitiatorPlayer extends Player
{
private static final String INIT_MESSAGE = "initiator player";
public InitiatorPlayer(BlockingQueue<String> sent, BlockingQueue<String> received)
{
super(sent, received);
}
@Override
public void run()
{
sendInitMessage();
while (true)
{
String receivedMessage = receive();
reply(receivedMessage);
}
}
private void sendInitMessage()
{
try
{
sent.put(INIT_MESSAGE);
System.out.printf("Player [%s] sent message [%s].%n", this, INIT_MESSAGE);
}
catch (InterruptedException interrupted)
{
String error = String.format(
"Player [%s] failed to sent message [%s].",
this, INIT_MESSAGE);
throw new IllegalStateException(error, interrupted);
}
}
}
Пример вывода:
Player [InitiatorPlayer@712dc5e9] sent message [initiator player].
Player [Player@69bf9b51] sent message [initiator player 0].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0].
Player [Player@69bf9b51] sent message [initiator player 0 0 1].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1].
Player [Player@69bf9b51] sent message [initiator player 0 0 1 1 2].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1 2 2].
Player [Player@69bf9b51] sent message [initiator player 0 0 1 1 2 2 3].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1 2 2 3 3].
Player [Player@69bf9b51] sent message [initiator player 0 0 1 1 2 2 3 3 4].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1 2 2 3 3 4 4].
Player [Player@69bf9b51] sent message [initiator player 0 0 1 1 2 2 3 3 4 4 5].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1 2 2 3 3 4 4 5 5].
PS Вы можете получить немного другой вывод на вашем компьютере, например:
Player [InitiatorPlayer@82b9342] sent message [initiator player].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0].
Player [Player@5d7a0209] sent message [initiator player 0].
Player [Player@5d7a0209] sent message [initiator player 0 0 1].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1].
Player [Player@5d7a0209] sent message [initiator player 0 0 1 1 2].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1 2 2].
Player [Player@5d7a0209] sent message [initiator player 0 0 1 1 2 2 3].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1 2 2 3 3].
Player [Player@5d7a0209] sent message [initiator player 0 0 1 1 2 2 3 3 4].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1 2 2 3 3 4 4].
Player [Player@5d7a0209] sent message [initiator player 0 0 1 1 2 2 3 3 4 4 5].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1 2 2 3 3 4 4 5 5].
Это происходит из-за того, что игроки работают в разных потоках. Возможна следующая ситуация:
- Первый игрок отправляет сообщение и печатает запись журнала на консоль.
- Второй игрок получает сообщение и отправляет ответ. Но соответствующий поток приостанавливается планировщиком потоков сразу после отправки ответа.
- Первый игрок получает ответ, отправляет еще одно сообщение и выводит запись журнала на консоль.
- Поток второго игрока просыпается планировщиком потоков и печатает запись журнала о своем ответе, упомянутом в пункте 3.
Это правильное поведение. Игроки синхронизируются очередями, например, первый игрок не отправит новое сообщение до ответа на предыдущее. Но печать журналов на консоль не синхронизируется (и не должна синхронизироваться) с отправкой / получением сообщений.
PS2 Задача может быть решена только с одной блокирующей очередью (или даже с одним мьютексом) . Но две отдельные очереди лучше подходят для иллюстрации и возможного расширения решения.