java сетевое программирование координация обмена сообщениями - PullRequest
4 голосов
/ 18 ноября 2010

У меня есть 2 процесса, запущенных на разных машинах, которые обмениваются данными через TCP-сокеты.
Оба процесса имеют код, который действует как сервер и как клиент.
То есть ProcessA открыл сокет сервера, который привязывается к портуX, а ProcessB открыл привязку сокета сервера к портуY.
ProcessA открывает клиентский сокет для соединения с ProcessB и начинает отправку сообщений в качестве клиента и получение ответов (по тому же TCP-соединению, конечно).
ProcessB, как только он получает сообщение и обрабатывает его, он отправляет ответ, но также может отправить сообщение по второму tcp-соединению, т. Е. Когда ProcessB открыл клиентский сокет для порта X ProcessA.
Таким образом, поток сообщений проходит через 2 разных соединения TCP.
Моя проблема заключается в следующем: Принимая как должное, что эта «архитектура» не может измениться и должна оставаться как есть:
У меня проблема в том, что периодически сообщения, отправляемые из ProcessB в ProcessA по TCP-соединению, в котором ProcessB открыл клиентский сокет, поступают в processA до того, как сообщения отправляются в виде ответов от ProcessB в ProcessA через TCP-соединение, которое ProcessA подключил в качестве клиента розетка.
То есть Оба потока происходят

(1)  
ProcessA ---->(msg)----> ProcessB(PortY)  (TCP1)
ProcessB does processing   
ProcessB(portY)--->(response)----->ProcessA (TCP1)  
ProcessB--->(msg)----->ProcessA(portX)  (TCP2)

(2)  
ProcessA ---->(msg)----> ProcessB(PortY)  (TCP1)
ProcessB does processing   
ProcessB--->(msg)----->ProcessA(portX)  (TCP2)
ProcessB(portY)--->(response)----->ProcessA  (TCP1)

РЕДАКТИРОВАТЬ (после запроса ejp) Как я могу принудить / убедиться, что ProcessB не отправляет сообщение через соединение, в котором у ProcessB есть клиентский сокет, открытый для порта сервера X ProcessA, до того, как сообщение, отправленное как ответ от порта сервера ProcessB, поступит в processA? То есть иметь только поток (1) из вышеперечисленного.
Обратите внимание, что processB является многопоточным, а обработка - нетривиальной.

UPDATE: Может быть, это мое заблуждение, но когда процесс отправляет данные через сокет и управление возвращается приложению, это не означает, что принимающая сторона получила данные. Так что, если процесс отправляет данные через 2 сокета, существует ли условие гонки в ОС?

UPDATE2
После ответа я получил от Виджая Мэтью:
Если я сделал блокировку в соответствии с предложением, есть ли гарантия, что ОС (т.е. уровень IP) будет отправлять данные по порядку? То есть закончить одну передачу, а затем отправить следующую? Или я бы их мультиплексировал и имел бы одинаковую проблему?

Спасибо

Ответы [ 3 ]

1 голос
/ 02 августа 2011

Проблема синхронизации может заключаться не в протоколе TCP, а в обработчике потока, который выбирает, какой поток активировать при поступлении сообщений.Из характера вашего вопроса я понимаю, что PortX "(Msg)" отправляется очень быстро после PortY "(Ответ)".Это означает, что обработчик потока может иногда иметь выбор относительно того, какой из двух потоков прослушивания он будет активировать.

Простой, но уродливый и неполный способ решения проблемы - вставить короткий сон междуответ и следующее сообщение.Спящий режим должен быть достаточно длинным, чтобы быть уверенным, что другой процесс проснется до ответа до получения следующего сообщения.Этот способ неполон, потому что, хотя вы увеличиваете изменения, связанные с правильной синхронизацией обработки, такие проблемы, как загрузка ОС и перегрузка сети, всегда могут привести к тому, что ваше сообщение окажется позади вашего ответа.А потом ты вернулся туда, откуда начал.Другое уродство заключается в том, что сон тратит время и снижает вашу максимальную пропускную способность.Просто реже.Итак ...

Чтобы полностью решить проблему, необходимо, чтобы каждый прослушиватель сокетов знал, является ли только что полученное сообщение следующим обрабатываемым, или могут существовать более ранние сообщения, которые должныобрабатываться в первую очередь.Сделайте это путем последовательной нумерации всех сообщений, отправляемых каждым процессом.Затем процесс приема узнает, что-то отсутствует.

Вам нужно будет придумать способ, которым слушатели на каждом сокете будут взаимодействовать между собой, чтобы гарантировать, что полученные сообщения обрабатываются в порядке передачи.Существует несколько практических решений, но все они равны на абстрактном, концептуальном уровне.

РЕЗЬБА 1: A) Поток ProcessA (PortX) получает сообщение и пробуждается.
B)ЕСЛИ порядковый номер указывает на наличие пропущенного сообщения, ТО B1) синхронизируется на ProcessA (PortY) и wait ().B2) После пробуждения вернитесь к B) C) {сообщение не пропущено} Обработайте сообщение.D) Назад к A)

THREAD 2: A) ProcessA (PortY) получает ответ и просыпается.Б) Обработка ответа.В) уведомить все ().D) Назад к A)

Наиболее общие практические решения, вероятно, будут включать в себя один экземпляр прослушивателя сокетов, добавляющий все новые сообщения в PriorityQueue, так что самые ранние отправленные сообщения всегда отправляются в начало очереди.Затем поток 1 и поток 2 могут ожидать в этом экземпляре, пока не прибудет сообщение, которое они могут обработать.

Более простое, но менее расширяемое решение состояло бы в том, чтобы каждый поток выполнял свое собственное прослушивание и ожидание с (ответом)обработчик уведомляет после обработки.

Удачи с ним, хотя по прошествии этого времени он, вероятно, уже решен.

1 голос
/ 18 ноября 2010

Очевидное решение:

LockObject lock;

ProcessA ---->(msg)----> ProcessB(PortY)

// Processing the request and sending its response 
// makes a single transaction.
synchronized (lock) {
    ProcessB does processing   
    ProcessB(portY)--->(response)----->ProcessA (TCP1)
}

// While the processing code holds the lock, B is not
// allowed to send a request to A.
synchronized (lock) {
    ProcessB--->(msg)----->ProcessA(portX)  (TCP2)
}
0 голосов
/ 19 ноября 2010

Очевидный вопрос: почему тебя это волнует? Если у вас есть операции, которые нужно синхронизировать с обоих концов, сделайте это. Не ожидайте, что TCP сделает это за вас, это не то, для чего он.

...