AsynchronousSocketChannel Read / WritePendingException - можно ли синхронизировать? - PullRequest
0 голосов
/ 07 мая 2020

Я работаю на TCP-сервере, и мне любопытно, можно ли синхронизировать чтение и запись методов AsynchronousSocketChannel. Я обернул канал в другой класс, потому что мне нужны были дополнительные функции на моем канале. Мой вопрос в том, действительно ли это правильный способ синхронизации:

/**
 * writes bytes from a <b>ByteBuffer</b> into an
 * <b>AsynchronousSocketChannel</b>
 * 
 * @param buffer    the ByteBuffer to write from
 * @param onFailure specifies the method that should be called on failure of the
 *                  write operation
 */
public void write(ByteBuffer buffer, final C onFailure) {

    CompletionHandler<Integer, ByteBuffer> handler = new CompletionHandler<Integer, ByteBuffer>() {

        @Override
        public void completed(Integer result, ByteBuffer buf) {
            if (buf.hasRemaining())
                channel.write(buf, buf, this);
        }

        @Override
        public void failed(Throwable exc, ByteBuffer buf) {
            attachment.call(onFailure, exc);
        }

    };

    synchronized (writeLock) {
        this.channel.write(buffer, buffer, handler);
    }
}

В этом случае writeLock - это объект static final, который получает блокировку при запуске любого из произвольных экземпляров моего класса оболочки. операция записи. Это действительно работает или у него просто закончился синхронизированный блок?

1 Ответ

0 голосов
/ 19 мая 2020

Вот как я это исправил:

/**
 * writes bytes from a <b>ByteBuffer</b> into an
 * <b>AsynchronousSocketChannel</b>
 * 
 * @param buffer    the ByteBuffer to write from
 * @param onFailure specifies the method that should be called on failure of the
 *                  write operation
 */
public void write(ByteBuffer buffer, final C onFailure) {

    CompletionHandler<Integer, ByteBuffer> handler = new CompletionHandler<Integer, ByteBuffer>() {

        @Override
        public void completed(Integer result, ByteBuffer buf) {
            if (buf.hasRemaining()) {
                channel.write(buf, buf, this);
                return;
            }

            synchronized (writeLock) {
                if (!writeQueue.isEmpty()) {
                    while (writePending)
                        ;

                    ByteBuffer writeBuf = writeQueue.pop();
                    channel.write(writeBuf, writeBuf, this);
                    writePending = true;
                    return;
                }
            }

            writePending = false;
        }

        @Override
        public void failed(Throwable exc, ByteBuffer buf) {
            writePending = false;
            attachment.call(onFailure, exc);
        }

    };

    synchronized (writeLock) {
        while (this.writePending)
            ;

        this.writeQueue.push(buffer);

        ByteBuffer writeBuffer = this.writeQueue.pop();
        this.channel.write(writeBuffer, writeBuffer, handler);
        this.writePending = true;
    }
}
...