Как создать петли обратной связи или бесконечные петли в RX без субъектов? - PullRequest
0 голосов
/ 22 мая 2019

Мне нужно создать простой реактивный сервер, который принимает соединения TCP, и я столкнулся с интересным поведением RX, которое мне нужно объяснить и исправить. В этот момент сервер просто принимает соединения, считывает данные и записывает количество полученных данных в журнал. Единственная трудность для RX состоит в том, что после того, как клиент закрывает соединение, сервер должен быть повторно инициализирован (новый вызов accept()). На самом деле это бесконечный цикл, который довольно сложно сделать в RX. Я могу использовать некоторые Subject для построения фактического цикла из сигнала onComplete, но я слышал, что субъектов следует избегать, поэтому я попытался сгенерировать бесконечный Observable, который естественным образом излучает новый импульс каждый раз, когда клиентское соединение завершается.

Это работает хорошо, но только до тех пор, пока весь сервер остается в главном потоке. Как только что-то в цепочке запускается в фоновом потоке, бесконечный Observable посылает поток импульсов для повторной инициализации сервера, см. Комментарии в коде. Это почему? Как мне разработать такой базовый сервер (не считая полностью RX)?

Observable
    // initialize server address
    .just(new InetSocketAddress(port))
    // initialize server
    .map(address -> AsynchronousServerSocketChannel.open().bind(address))
    // infinite loop equivalent: generate infinite sequence of the same server instance
    // as long as the downstream chain remains on the main thread it works as expected - server is reopened
    // each time client closes the connection. If the downstream is however non-blocking, it generates flood of "Creating server"
    // messages in log
    .flatMap(server -> Observable
            .generate((Emitter<AsynchronousServerSocketChannel> emitter) -> emitter.onNext(server)))
    .doOnEach(notification -> LOG.info("Creating server"))
    // repeat for every connection
    .concatMap(
            server -> Observable
            // Wait for incoming connection 
            // This works because it is in fact blocking so it propably keeps the source infinite sequence observable cold
            .fromFuture(server.accept())
            // if it is however subscribed on background thread it results in flood of "Creating server" log messages
            // generated by the infinite sequence observable
            //.subscribeOn(Schedulers.io())
            /*
            // this has the same behavior as subscribeOn, because it's asynchronous by design (handler is called from outside)
            .create((ObservableEmitter<AsynchronousSocketChannel> emitter) -> server.accept(server,
                    new CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>() {
                        @Override
                        public void completed(AsynchronousSocketChannel result,
                                AsynchronousServerSocketChannel server) {
                            emitter.onNext(result);
                            emitter.onComplete();
                        }

                        @Override
                        public void failed(Throwable exc, AsynchronousServerSocketChannel server) {
                            emitter.onError(exc);
                        }
                    }
            ))
                */
            .doOnNext(notification -> LOG.info("Server created"))
            // read incoming data and emit it's length
            .flatMap(channel -> Observable.create((ObservableEmitter<Integer> emitter) -> 
                    {
                        while (channel.isOpen()) {
                            try {
                                Integer result = channel.read(ByteBuffer.allocate(32)).get();
                                if (result == -1) {
                                    channel.close();
                                } else {
                                    emitter.onNext(result);
                                }
                            } catch (Exception e) {
                                emitter.onError(e);
                            }
                        }
                        emitter.onComplete();
                    }
            ))
            .doOnComplete(() -> LOG.info("Server completed"))
            .doOnError(error -> LOG.info("Server error " + error))
    )
    .forEach(readBytes -> LOG.info("Read bytes " + readBytes));

1 Ответ

0 голосов
/ 23 мая 2019

Ваш код выглядит загадочно. Даже если вам удастся заставить его работать, его будет сложно поддерживать. Rx и Nio2 используют разные модели асинхронных вычислений, поэтому использование Rx с Nio2 только усложняет программирование.

Я предлагаю либо использовать чистый Nio2, либо использовать мою асинхронную библиотеку df4j , которая имеет адаптеры как для Nio2, так и для rx-java2.

...