Проблемы с записью списка ByteBuffer в socketChannel, когда список содержит более одного элемента - PullRequest
0 голосов
/ 14 февраля 2020

, поэтому я пытался отправить несколько сообщений в socketChannel, используя java nio Selectors, как показано в методе, написанном ниже. Однако проблема заключается в том, что когда «Список очереди» содержит более одного элемента, то соединение обрывается или, скорее, ни одно из сообщений не принимается другим socketChannel, теперь я не знаю, является ли это проблемой, почему метод read или написать метод? Может кто-нибудь сказать мне, что я здесь делаю не так? И как это исправить? Заранее благодарим.

Пишите и читайте следующим образом: -

    private void write(SelectionKey selectionKey) throws IOException {
    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
    List<ByteBuffer> queue = this.pendingWrites.get(selectionKey);

    // Write until there's no more data left ...
    while (!queue.isEmpty()) {
        ByteBuffer buf = queue.get(0);
        try {
            socketChannel.write(buf);
        } catch (IOException ex) {
            //There could be an IOException: Connection reset by peer
            queue.clear(); //clear the queue

            return;
        }
        if (buf.remaining() > 0) {
            // ... or the selectionKey's buffer fills up
            break;
        }
        queue.remove(0);
    }

    if (queue.isEmpty()) {
    // We wrote away all data, so we're no longer interested
    // in writing on this selectionKey. Switch back to waiting for
    // data.
    selectionKey.interestOps(SelectionKey.OP_READ);
    this.selector.wakeup();
    }
}

private void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    this.readBuffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(this.readBuffer);
    } catch (IOException e) {

    }

        //TODO: Shutting down connection
        //this.cmdProcessor.connectionClosed(remoteAddress.getAddress());

        // The remote forcibly closed the connection, cancel
        // the selection key and close the channel.
        key.cancel();
        socketChannel.close();

        return;
    }

    if (numRead == -1) {

        // Remote entity shut the selectionKey down cleanly. Do the
        // same from our end and cancel the channel.
        key.channel().close();
        key.cancel();

        return;
    }

    byte[] dataCopy = new byte[numRead];
    System.arraycopy(this.readBuffer.array(), 0, dataCopy, 0, numRead);
    //System.out.println("#tempdata:" + new String(dataCopy, Constants.TELNET_ENCODING));

    // If we have already received some data, we add this to our buffer
    if (this.pendingReads.containsKey(key)) {
        byte[] existingBytes = pendingReads.get(key);

        byte[] concatenated = new byte[existingBytes.length + dataCopy.length];
        System.arraycopy(existingBytes, 0, concatenated, 0, existingBytes.length);
        System.arraycopy(dataCopy, 0, concatenated, existingBytes.length, dataCopy.length);

        dataCopy = concatenated;
    }

    //If somebody sends us very long requests, we just close the connection
    if (dataCopy.length > 128000) {
        key.channel().close();
        key.cancel();

        this.pendingReads.remove(key);
        return;
    }

    // In case we have now finally reached all characters
    byte[] unprocessed = processReceiveBuffer(dataCopy, key);
    this.pendingReads.put(key, unprocessed);
}

    private byte[] processReceiveBuffer(byte[] data, SelectionKey key) throws InvalidProtocolBufferException {

    if (data.length > 4) {//the first 4 bytes are the length
        int msg_length = ((0xFF & data[0]) << 24) | ((0xFF & data[1]) << 16) | ((0xFF & data[2]) << 8) | (0xFF & data[3]);

        if ((msg_length + 4) >= data.length) {
            byte[] msg_data = new byte[msg_length];
            System.arraycopy(data, 4, msg_data, 0, msg_length);

            api.BaseMessage bm = api.BaseMessage.parseFrom(msg_data);
            handleRequest(key, bm);

            msg_data = new byte[msg_length + 4 - data.length];
            System.arraycopy(data, msg_length + 4, msg_data, 0, msg_data.length);

            return msg_data;
        }
    }

    return data;
}
...