StreamCorruptedException после ~ 1000 переданных объектов - PullRequest
0 голосов
/ 20 апреля 2019

Я хочу перенести объекты (AssignmentListener) с одного Java-сервера на 5 Java-клиентов.

Поэтому я написал способ отправить сообщение:

    private void sendMessage(AssignmentListener listener, int[] subpartitionIndices){
        boolean success = false;
        int failCount = 0;

        // retry for the case of failure
        while(!success && failCount < 10) {
            try {
                // get the stored socket & stream if stored
                if(listener.getSocket() == null) {
                    if (localMode) {
                        listener.setSocket(new Socket("localhost", listener.getPort()));
                    } else {
                        listener.setSocket(new Socket(listener.getIp(), listener.getPort()));
                    }
                    listener.setOutputStream(new ObjectOutputStream(listener.getSocket().getOutputStream()));
                }

                AssignmentListenerMessage assignmentListenerMessage = new AssignmentListenerMessage(subpartitionIndices);
                System.out.println("Sending " + assignmentListenerMessage);

                listener.getOutputStream().writeObject(assignmentListenerMessage);
                listener.getOutputStream().flush();

                success = true;

            } catch (IOException se) {
                se.printStackTrace();
                System.err.println("Failed to forward " + Arrays.toString(subpartitionIndices) + " to " + listener);
                failCount++;
            }
        }
    }

На стороне клиента у меня есть следующее:

    public void run() {

        String mode = "remote";
        if(localMode) mode = "local";
        // we need to register this listener at at the OverpartitioningManager
        if(register(isLocalRequest)) System.out.println("Registered AssignmentListenerServer for index "+subpartitionIndex+" at ForwardingServer -  "+mode);

        running = true;
        while (running) {
            try {
                socket = serverSocket.accept();


                // Pass the socket to the RequestHandler thread for processing
                RequestHandler requestHandler = new RequestHandler( socket );
                requestHandler.start();

            } catch (SocketException se) {
                se.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }


        }
        try {
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }





    class RequestHandler extends Thread {
        private Socket socket;

        RequestHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                System.out.println("Received a connection");

                // Get input and output streams
                inStream = new ObjectInputStream(socket.getInputStream());

                //outStream = new DataOutputStream(socket.getOutputStream());
                AssignmentListenerMessage incomingMessage = null;
                while(socket.isBound()) {
                    try {
                        incomingMessage = (AssignmentListenerMessage) inStream.readObject();
                    }catch (StreamCorruptedException sce){
                        System.out.println("Failed to read AssignmentMessage from Stream, but will try again... (no ack)");
                        sce.printStackTrace();
                        continue;
                    }

                        // do stuff with the message

                }

                // Close our connection
                inStream.close();
                socket.close();

                System.out.println("Connection closed");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

Это работает несколько раз, но в один момент я получаю следующее исключение:

java.io.StreamCorruptedException: invalid type code: 00

У кого-нибудь есть идея или какое-либо другое улучшение производительности для того, что я делаю? Спасибо.

...