Я пишу класс Connection
, который отправляет и получает данные в обоих направлениях через Command
s и CommandResult
s.Однако, когда несколько запросов отправляются быстро через Connection
, некоторые не справляются должным образом.
Это звучит как своего рода состояние гонки, но я чувствую, что подготовился к этому с помощью:
- Блокировка и разблокировка записи в сокет,
- с таблицей отправленных
Command
s, чьи CommandResult
s не были получены, - и блокировкаи разблокирование изменений в указанной таблице.
Command
s принимаются и обрабатываются в одном потоке, так что это не должно быть проблемой.
Я просмотрелдостаточно много раз писать код, чтобы я чувствовал, что проблема должна быть в другом месте, но моя команда очень уверена, что виновником является Connection
.
Этот пример немного длинный, но он был настолько мал, насколько ямог бы сделать полный пример.Я действительно удостоверился, что это было хорошо зарегистрировано все же.Важные вещи, которые нужно знать:
AwaitWrapper
s - это просто будущее.Получение ресурса будет блокироваться до тех пор, пока он фактически не будет заполнен, Message
s просто переносит запросы и ответы, - a
Serializer
- это, по сути, оболочка gson, Command
с и CommandResult
с отслеживаются с помощью общего UUID, - и
ICommandHandler
с принимают Command
и выводят CommandResult
.Содержание Command
с и CommandResult
с не должно иметь значения для этого.
Connection.java:
public class Connection {
private Socket socket;
private ICommandHandler handler;
private Serializer ser;
private Lock resultsLock;
private Lock socketWriteLock;
private Map<UUID,AwaitWrapper<CommandResult>> reservations;
public Connection(Socket socket) {
ser = new Serializer();
reservations = new TreeMap<UUID,AwaitWrapper<CommandResult>>();
handler = null;
this.socket = socket;
// Set up locks
resultsLock = new ReentrantLock();
socketWriteLock = new ReentrantLock();
}
public Connection(String host, int port) throws UnknownHostException, IOException {
socket = new Socket(host, port);
ser = new Serializer();
reservations = new TreeMap<UUID,AwaitWrapper<CommandResult>>();
handler = null;
// Set up locks
resultsLock = new ReentrantLock(true);
socketWriteLock = new ReentrantLock(true);
}
/* Sends a command on the socket, and waits for the response
*
* @param com The command to be sent
* @return The Result of the command operation.
*/
public CommandResult sendCommand(Command com) {
try {
AwaitWrapper<CommandResult> delayedResult = reserveResult(com);
write(new Message(com));
CommandResult res = delayedResult.waitOnResource();
removeReservation(com);
return res;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
/* Sets handler for incoming Commands. Also starts listening to the socket
*
* @param handler The handler for incoming Commands
*/
public void setCommandHandler(ICommandHandler handler) {
if (handler == null) return;
this.handler = handler;
startListening();
}
/* Starts a thread that listens to the socket
*
* Note: don't call this until handler has been set!
*/
private void startListening() {
Thread listener = new Thread() {
@Override
public void run() {
while (receiveMessage());
handler.close();
}
};
listener.start();
}
/* Recives all messages (responses _and_ results) on a socket
*
* Note: don't call this until handler has been set!
*
* @return true if successful, false if error
*/
private boolean receiveMessage() {
InputStream in = null;
try {
in = socket.getInputStream();
Message message = (Message)ser.deserialize(in, Message.class);
if (message == null) return false;
if (message.containsCommand()) {
// Handle receiving a command
Command com = message.getCommand();
CommandResult res = handler.handle(com);
write(new Message(res));
} else if (message.containsResult()) {
// Handle receiving a result
CommandResult res = message.getResult();
fulfilReservation(res);
} else {
// Neither command or result...?
return false;
}
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
//--------------------------
// Thread safe IO operations
private void write(Message mes) throws IOException {
OutputStream out = socket.getOutputStream();
socketWriteLock.lock();
ser.serialize(out, mes);
socketWriteLock.unlock();
}
//----------------------------------
//Thread safe reservation operations
private AwaitWrapper<CommandResult> reserveResult(Command com) {
AwaitWrapper<CommandResult> delayedResult = new AwaitWrapper<CommandResult>();
resultsLock.lock();
reservations.put(com.getUUID(), delayedResult);
resultsLock.unlock();
return delayedResult;
}
private void fulfilReservation(CommandResult res) {
resultsLock.lock();
reservations.get(res.getUUID()).setResource(res);
resultsLock.unlock();
}
private void removeReservation(Command com) {
resultsLock.lock();
reservations.remove(com.getUUID());
resultsLock.unlock();
}
//-------------------------------------------------------------------
// A Message wraps both commands and results for easy deserialization
private class Message {
...
}
}
При мониторинге принимающей стороны Connection
, обработчик никогда не срабатывает для некоторых из Command
отправленных.Он должен запускаться и обрабатывать каждый входящий Command
.
. Я рассматриваю отключение таблицы резервирования и блокировку записи в сокет до получения ответа, но я ожидаю, что это победило 'без значительных потерь в производительности.
Я пропустил какой-то важный шаг, который предотвратил бы гоночные условия?
РЕДАКТИРОВАТЬ: Добавление Serializer
и ICommandHandler
классы для любопытных.
Serializer.java:
public class Serializer {
private Gson gson;
public Serializer() {
gson = new Gson();
}
public Object deserialize(InputStream is, Class type) throws IOException {
JsonReader reader = new JsonReader(new InputStreamReader(is, StandardCharsets.UTF_8));
reader.setLenient(true);
if (reader.hasNext()) {
Object res = gson.fromJson(reader, type);
return res;
}
return null;
}
public void serialize(OutputStream os, Object obj) throws IOException {
JsonWriter writer = new JsonWriter(new OutputStreamWriter(os, StandardCharsets.UTF_8));
gson.toJson(obj, obj.getClass(), writer);
writer.flush();
}
}
ICommandHandler:
public interface ICommandHandler {
public CommandResult handle(Command com);
public void close();
}