Java асинхронный клиент, непредвиденное поведение - PullRequest
1 голос
/ 06 мая 2019

Недавно я открыл один из моих асинхронных фрагментов сокетов, и я был поражен ошибочным поведением следующего клиентского кода, это базовый клиент PoC, использующий AsynchronousSocketChannel.

Этот фрагмент в идеале, он никогда не должен достигать "Я волнуюсь", но это так.В основном проблема в том, что я использую ByteBuffer, который в конце цикла я установил на 0, в начале цикла я ожидаю, что он будет равен 0, но иногда это не так.

Видео, показывающееошибка:
https://www.youtube.com/watch?v=bV08SjYutRw&feature=youtu.be
Я могу решить проблему, вызвав .clear () сразу после .nextLine (), но мне любопытно, что происходит в этом невинном фрагменте?

package com.melardev.sockets.clients;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Scanner;

public class AsyncTcpClientCallbacks {

    // Horrible demo, I would never write asynchronous sockets this way, I would use attachments
    // which allows our code to be cleaner and isolate everything into their own classes
    // This is only here to show you how you could do it without attachments, but you have
    // to expose the socketChannel so it can be accessible from everywhere, my recommendation is not to bother
    // learning, this, go to the demo where I use attachments, it is a lot more readable


    static CompletionHandler<Integer, ByteBuffer> readHandler = new CompletionHandler<Integer, ByteBuffer>() {
        @Override
        public void completed(Integer bytesReaded, ByteBuffer buffer) {
            buffer.flip();
            byte[] receivedBytes = new byte[buffer.limit()];
            // Get into receivedBytes
            buffer.get(receivedBytes);
            String message = new String(receivedBytes);
            System.out.println(message);

            buffer.clear();
            socketChannel.read(buffer, buffer, this);
        }

        @Override
        public void failed(Throwable exc, ByteBuffer buffer) {
            System.err.println("Error reading message");
            System.exit(1);
        }

    };

    static private CompletionHandler<Integer, Void> writeHandler = new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer bytesWritten, Void attachment) {

        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            System.err.println("Something went wrong");
            System.exit(-1);
        }
    };

    private static AsynchronousSocketChannel socketChannel;

    public static void main(String[] args) {

        try {
            socketChannel = AsynchronousSocketChannel.open();


            //try to connect to the server side
            socketChannel.connect(new InetSocketAddress("localhost", 3002), null
                    , new CompletionHandler<Void, Void>() {
                        @Override
                        public void completed(Void result, Void attachment) {


                            ByteBuffer receivedBuffer = ByteBuffer.allocate(1024);
                            socketChannel.read(receivedBuffer, receivedBuffer, readHandler);


                            Scanner scanner = new Scanner(System.in);
                            System.out.println("Write messages to send to server");
                            ByteBuffer bufferToSend = ByteBuffer.allocate(1024);
                            String line = "";
                            while (!line.equals("exit")) {
                                if (bufferToSend.position() != 0) {
                                    System.err.println("I am freaking out 1");
                                }
                                line = scanner.nextLine();
                                if (bufferToSend.position() != 0) {
                                    System.err.println("I am freaking out 2");
                                }
                                byte[] bytesToWrite = line.getBytes();
                                // bufferToSend.clear();
                                bufferToSend.put(bytesToWrite);
                                System.out.println(bufferToSend.limit());
                                bufferToSend.flip();
                                System.out.println(bufferToSend.limit());

                                if (bufferToSend.position() != 0) {
                                    System.err.println("I am freaking out 3");
                                }
                                if (bufferToSend.limit() != line.length()) {
                                    System.err.println("I am freaking out 4");
                                }

                                socketChannel.write(bufferToSend, null, writeHandler);


                                bufferToSend.limit(bufferToSend.capacity());
                                bufferToSend.position(0);
                                // The two lines above are the same as
                                // bufferToSend.clear(); // Reuse the same buffer, so set pos=0
                                // limit to the capacity which is 1024

                                if (bufferToSend.position() != 0) {
                                    System.err.println("I am freaking out 5");
                                }


                            }
                        }

                        @Override
                        public void failed(Throwable exc, Void nothing) {
                            System.out.println("Error connection to host");
                        }

                    });

            while (true) {
                try {
                    Thread.sleep(60 * 1000);
                    // Sleep 1 min ... who cares ?
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

Как, черт возьми, выполняются операторы "Я волнуюсь"?Я не вижу, как это могло бы быть, условия никогда не должны оцениваться как истинные, я сделал видео, где отчетливо показана ошибка, иногда в начале цикла while () положение буфера отличается от 0, но этоне должно, потому что в конце цикла я установил его на 0. Удивительно, этого поведения не происходит, когда я делаю точку останова перед запуском приложения и отслеживаю построчно ... как это может быть?
Я записал это, я начал с трассировки через отладчик, все работало нормально, но как только я убрал взлом и позволил отладчику работать, тот же код, который работал раньше, теперь нет.Чего мне не хватает?
Видео, показывающее, когда он работал с трассировкой, находится здесь: https://www.youtube.com/watch?v=0H1OJdZO6AY&feature=youtu.be
Если вы хотите использовать сервер для игры, то это тот, который используется в видео

package com.melardev.sockets.servers;

import com.melardev.sockets.Constants;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class AsyncTcpEchoServerKey {

    public static void main(String[] args) {

        try {
            // Create new selector
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().setReuseAddress(true);
            // By default this is true, so set it to false for nio sockets
            serverSocketChannel.configureBlocking(false);
            InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
            // Bind to localhost and specified port
            serverSocketChannel.socket().bind(new InetSocketAddress(loopbackAddress, Constants.SOCKET_PORT));


            // ServerSocketChannel only supports OP_ACCEPT (see ServerSocketChannel::validOps())
            // it makes sense, server can only accept sockets
            int operations = SelectionKey.OP_ACCEPT;

            serverSocketChannel.register(selector, operations);
            while (true) {
                if (selector.select() <= 0) {
                    continue;
                }

                try {
                    processReadySet(selector.selectedKeys());
                } catch (IOException e) {
                    continue;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public static void processReadySet(Set readySet) throws IOException {
        Iterator iterator = readySet.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = (SelectionKey) iterator.next();
            // After processing a key, it still persists in the Set, we wanna remove it
            // otherwise we will get it back the next time processReadySet is called
            // We would end up processing the same "event" as many times this method is called
            iterator.remove();
            System.out.printf("isAcceptable %b isConnectable %b isReadable %b isWritable %b\n"
                    , key.isAcceptable(), key.isConnectable(), key.isReadable(), key.isWritable());
            if (key.isAcceptable()) {
                ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();

                // Get the client socket channel
                SocketChannel clientSocketChannel = (SocketChannel) ssChannel.accept();
                // Configure it as non-blocking socket
                clientSocketChannel.configureBlocking(false);
                // Register the socket with the key selector, we want to get notified when we have
                // something to read from socket(OP_READ)
                clientSocketChannel.register(key.selector(), SelectionKey.OP_READ);
            } else if (key.isReadable()) {
                // A Remote client has send us a message
                String message = "nothing";
                // Get the socket who sent the message
                SocketChannel sender = (SocketChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int bytesCount = 0;
                try {
                    bytesCount = sender.read(buffer);

                    if (bytesCount > 0) {

                        // 1. Get manually
                        message = new String(buffer.array(), 0, bytesCount);

                        // 2. Or, use flip
                        // set buffer.position =0 and buffer.limit = bytesCount
                        buffer.flip();
                        byte[] receivedMessageBytes = new byte[bytesCount];
                        buffer.get(receivedMessageBytes);
                        message = new String(receivedMessageBytes);
                        System.out.println("Receive " + message);
                        // Writing
                        // 1. Easy approach, create a new ByteBuffer and send it
                        // ByteBuffer outputBuffer = ByteBuffer.wrap(message.getBytes());
                        // sender.write(outputBuffer);

                        // 2. Or to reuse the same buffer we could
                        // buffer.limit(buffer.position());
                        // buffer.position(0);

                        // 3. Or the same as point 2, but one line
                        buffer.flip();

                        sender.write(buffer);


                    } else {
                        SocketChannel ssChannel = (SocketChannel) key.channel();
                        ssChannel.close();
                        System.out.println("Client disconnected");
                        break;
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                    try {
                        sender.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
    }

}

PD: Первое видео показывает даже больше, чем то, что я сказал ранее, обратите внимание, что перед тем, как установить разрыв, консоль показала, что я волнуюсь 2 и 4, когда было установлено отключение, я также вызвал, что я волнуюсь1, чего вначале не было, и не только это, но когда я возобновил процесс, на этот раз я волнуюсь, 1 не сработал !!!

1 Ответ

2 голосов
/ 06 мая 2019

Документация AsynchronousSocketChannel.write() гласит:

Буферы небезопасны для использования несколькими параллельными потоками, поэтому следует соблюдать осторожность, чтобы не получить доступ к буферу до завершения операции.

Вы не проявляете такой осторожности, скорее, вы устанавливаете предел и позицию bufferToSend независимо от завершения записи, поэтому неожиданное поведение может быть связано с этой небрежностью.

...