Мое сокетное соединение теряет некоторые данные, отправленные через него? - PullRequest
0 голосов
/ 04 апреля 2019

Я пишу класс Connection, который отправляет и получает данные в обоих направлениях через Command s и CommandResult s.Однако, когда несколько запросов отправляются быстро через Connection, некоторые не справляются должным образом.

Это звучит как своего рода состояние гонки, но я чувствую, что подготовился к этому с помощью:

  • Блокировка и разблокировка записи в сокет,
  • с таблицей отправленных Command s, чьи CommandResult s не были получены,
  • и блокировкаи разблокирование изменений в указанной таблице.

Command s принимаются и обрабатываются в одном потоке, так что это не должно быть проблемой.

Я просмотрелдостаточно много раз писать код, чтобы я чувствовал, что проблема должна быть в другом месте, но моя команда очень уверена, что виновником является Connection.

Этот пример немного длинный, но он был настолько мал, насколько ямог бы сделать полный пример.Я действительно удостоверился, что это было хорошо зарегистрировано все же.Важные вещи, которые нужно знать:

  • AwaitWrapper s - это просто будущее.Получение ресурса будет блокироваться до тех пор, пока он фактически не будет заполнен,
  • Message s просто переносит запросы и ответы,
  • a Serializer - это, по сути, оболочка gson,
  • Command с и CommandResult с отслеживаются с помощью общего UUID,
  • и ICommandHandler с принимают Command и выводят CommandResult.Содержание Command с и CommandResult с не должно иметь значения для этого.

Connection.java:

public class Connection {

    private Socket socket;
    private ICommandHandler handler;
    private Serializer ser;
    private Lock resultsLock;
    private Lock socketWriteLock;
    private Map<UUID,AwaitWrapper<CommandResult>> reservations;

    public Connection(Socket socket) {
        ser = new Serializer();
        reservations = new TreeMap<UUID,AwaitWrapper<CommandResult>>();
        handler = null;
        this.socket = socket;

        // Set up locks
        resultsLock = new ReentrantLock();
        socketWriteLock = new ReentrantLock();
    }

    public Connection(String host, int port) throws UnknownHostException, IOException {
        socket = new Socket(host, port);
        ser = new Serializer();
        reservations = new TreeMap<UUID,AwaitWrapper<CommandResult>>();
        handler = null;

        // Set up locks
        resultsLock = new ReentrantLock(true);
        socketWriteLock = new ReentrantLock(true);
    }


    /* Sends a command on the socket, and waits for the response
     *
     * @param com The command to be sent
     * @return The Result of the command operation.
     */
    public CommandResult sendCommand(Command com) {
        try {
            AwaitWrapper<CommandResult> delayedResult = reserveResult(com);
            write(new Message(com));

            CommandResult res = delayedResult.waitOnResource();
            removeReservation(com);
            return res;

        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }

    /* Sets handler for incoming Commands. Also starts listening to the socket
     *
     * @param handler The handler for incoming Commands
     */
    public void setCommandHandler(ICommandHandler handler) {
        if (handler == null) return;
        this.handler = handler;
        startListening();
    }

    /* Starts a thread that listens to the socket
     *
     * Note: don't call this until handler has been set!
     */
    private void startListening() {
        Thread listener = new Thread() {
            @Override
            public void run() {
                while (receiveMessage());
                handler.close();
            }
        };
        listener.start();
    }

    /* Recives all messages (responses _and_ results) on a socket
     *
     * Note: don't call this until handler has been set!
     *
     * @return true if successful, false if error
     */
    private boolean receiveMessage() {
        InputStream in = null;
        try {
            in = socket.getInputStream();

            Message message = (Message)ser.deserialize(in, Message.class);
            if (message == null) return false;

            if (message.containsCommand()) {
                // Handle receiving a command
                Command com = message.getCommand();
                CommandResult res = handler.handle(com);
                write(new Message(res));

            } else if (message.containsResult()) {
                // Handle receiving a result
                CommandResult res = message.getResult();
                fulfilReservation(res);
            } else {
                // Neither command or result...?
                return false;
            }

        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }


    //--------------------------
    // Thread safe IO operations

    private void write(Message mes) throws IOException {
        OutputStream out = socket.getOutputStream();
        socketWriteLock.lock();
        ser.serialize(out, mes);
        socketWriteLock.unlock();
    }


    //----------------------------------
    //Thread safe reservation operations

    private AwaitWrapper<CommandResult> reserveResult(Command com) {
        AwaitWrapper<CommandResult> delayedResult = new AwaitWrapper<CommandResult>();

        resultsLock.lock();
        reservations.put(com.getUUID(), delayedResult);
        resultsLock.unlock();

        return delayedResult;
    }

    private void fulfilReservation(CommandResult res) {
        resultsLock.lock();
        reservations.get(res.getUUID()).setResource(res);
        resultsLock.unlock();
    }

    private void removeReservation(Command com) {
        resultsLock.lock();
        reservations.remove(com.getUUID());
        resultsLock.unlock();
    }


    //-------------------------------------------------------------------
    // A Message wraps both commands and results for easy deserialization

    private class Message {
        ...
    }
}

При мониторинге принимающей стороны Connection, обработчик никогда не срабатывает для некоторых из Command отправленных.Он должен запускаться и обрабатывать каждый входящий Command.

. Я рассматриваю отключение таблицы резервирования и блокировку записи в сокет до получения ответа, но я ожидаю, что это победило 'без значительных потерь в производительности.

Я пропустил какой-то важный шаг, который предотвратил бы гоночные условия?


РЕДАКТИРОВАТЬ: Добавление Serializer и ICommandHandler классы для любопытных.

Serializer.java:

public class Serializer {

    private Gson gson;

    public Serializer() {
        gson = new Gson();
    }

    public Object deserialize(InputStream is, Class type) throws IOException {
        JsonReader reader = new JsonReader(new InputStreamReader(is, StandardCharsets.UTF_8));
        reader.setLenient(true);
        if (reader.hasNext()) {
            Object res = gson.fromJson(reader, type);
            return res;
        }
        return null;
    }

    public void serialize(OutputStream os, Object obj) throws IOException {
        JsonWriter writer = new JsonWriter(new OutputStreamWriter(os, StandardCharsets.UTF_8));
        gson.toJson(obj, obj.getClass(), writer);
        writer.flush();
    }
}

ICommandHandler:

public interface ICommandHandler {
    public CommandResult handle(Command com);
    public void close();
}

1 Ответ

0 голосов
/ 04 апреля 2019

Блокировки ничего не делают в вашем случае, там нет условий гонки, если один и тот же обработчик используется для нескольких сокетов, тогда блокировки должны быть внутри обработчика, вы используете однопоточный клиент, поэтому блокировки делают там ничего нет, обратите внимание: при использовании замков используйте try-finally. Если вы только начинаете с Sockets, вы, вероятно, этого не знаете, но SocketChannel намного более эффективен, чем класс Socket, который очень стар. Я не могу вам больше помочь, не увидев Serializer и ICommandHandler. Это, скорее всего, проблема в Serializer.

...