Недетерминированное поведение приложения на основе сокетов - PullRequest
0 голосов
/ 01 ноября 2018

Я работаю над клиент-серверным приложением и столкнулся с одной проблемой. Я получил ServerWorker, который отвечает за одного подключенного клиента, он создает 2 потока, 1 для прослушивания входящих данных от этого клиента и 1 для отправки данных ему.

class ServerWorker {
    private DataProcessor dataProcessor;
    private ObjectInputStream inputStream;
    private ObjectOutputStream outputStream;
    private Thread receiverThread;
    private Thread senderThread;
    private Optional<DataPacket> dataToSend;

    private ServerWorker(Socket socket) {
        try {
            dataToSend = Optional.empty();
            dataProcessor = new DataProcessor();
            receiverThread = new Thread(this::readAndProcessData);
            senderThread = new Thread(this::sendData);
            inputStream = new ObjectInputStream(socket.getInputStream());
            outputStream = new ObjectOutputStream(socket.getOutputStream());
        } catch (IOException e) {
            //TODO
        }
    }

    static ServerWorker create(Socket socket) {
        return new ServerWorker(socket);
    }

    void start() {
        receiverThread.start();
        senderThread.start();
    }

    void stop() {
        receiverThread.interrupt();
        senderThread.interrupt();
    }

    private void readAndProcessData() {
        DataPacket dataPacket;
        try {
            while((dataPacket = (DataPacket)inputStream.readObject()) != null) {
                System.out.println("incoming message: " + dataPacket.getContent());
                dataToSend = Optional.of(dataProcessor.process(dataPacket));
            }
        } catch (ClassNotFoundException | IOException e) {
            //TODO
        }
    }

    private void sendData() {
        while(true) { //TODO
            dataToSend.ifPresent(data -> {
                try {
                    outputStream.writeObject(data);
                    outputStream.flush();
                    dataToSend = Optional.empty();
                } catch (IOException e) {
                    //TODO
                }
            });
        }
    }
}

А DataProcessor - это пока небольшой класс

public class DataProcessor {
   public DataPacket process(DataPacket packet){
        packet.setContent(packet.getContent().toUpperCase());
        return packet;
   }
}

и конечно, DataPacket, что одинаково для клиента и сервера

public class DataPacket implements Serializable {
    private String content;

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

DataProcessor и DataPackets - это всего лишь разновидности POC, теперь они превратятся в гораздо большие и более сложные классы, короче говоря, ServerWorker получит данные и передаст их для обработки, а затем после некоторой логики возвращаемые данные будут сохранены в переменной dataToSend и удалены после отправки. Проблема в том, что код, который я выложил выше, работает только иногда. В 90% случаев, когда я запускаю свое серверное приложение и клиентское приложение (код ниже), ничего не происходит, прописные буквы "hello world" не возвращаются к клиенту. Что забавно, когда я запускаю свой сервер в режиме отладки (даже без каких-либо точек останова!), Он работает ... Есть идеи, что, черт возьми, пошло не так?

 public static void main(String... args) throws Exception {
        Socket socket = new Socket("localhost", 9999);
        DataPacket dataPacket = new DataPacket();
        dataPacket.setContent("hello world");
        ObjectOutputStream os = new ObjectOutputStream(socket.getOutputStream());
        os.writeObject(dataPacket);
        os.flush();

        ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
        while((dataPacket = (DataPacket)inputStream.readObject()) != null) {
            System.out.println(dataPacket.getContent());
        }
    }

редактировать # добавление еще одного класса ConnectionDispatcher, который отвечает за создание ServerWorker объектов

class ConnectionDispatcher implements Runnable {
    private ServerSocket serverSocket;
    private List<ServerWorker> serverWorkers;
    private volatile boolean isReceiving;

    private ConnectionDispatcher(int port) throws IOException {
        serverSocket = new ServerSocket(port);
        serverWorkers = new ArrayList<>();
        isReceiving = false;
    }

    static ConnectionDispatcher create(int port) throws IOException {
       return new ConnectionDispatcher(port);
    }

    @Override
    public void run() {
        isReceiving = true;
        while(isReceiving) {
           acceptIncomingConnections();
        }
    }

    private void acceptIncomingConnections() {
        try {
            ServerWorker worker = ServerWorker.create(serverSocket.accept());
            serverWorkers.add(worker);
            worker.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Ответы [ 2 ]

0 голосов
/ 02 ноября 2018

Вы можете использовать Очередь блокировки массива для имитации модели производителя и потребителя.

Пусть поток получателя поместит новый DataPacket в очередь, и пусть отправитель возьмет из очереди, обработает и отправит его.

Это устранит проблемы с многопоточностью и будет действовать как буфер.

С вашим текущим кодом вы можете потерять пакеты, когда они поступят с более высокой скоростью.

И я согласен с user930, private Optional<DataPacket> dataToSend; должен быть изменчивым.

Также вы можете сделать свой код более масштабируемым с помощью JavaNIO, вы можете изучить Apache Mina проект.

0 голосов
/ 01 ноября 2018

Кажется, что вы изменяете dataToSend из одного потока, одновременно читая его значение в другом. Это не потокобезопасно, и поток, который читает его значение, может никогда не увидеть обновленное значение, установленное другим потоком. По этой причине я бы объявил dataToSend как volatile.

private volatile Optional<DataPacket> dataToSend;

У меня еще не было возможности проверить это самостоятельно, но я смогу примерно через час (при условии, что это изменение не решит вашу проблему).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...