Мне нужно создать простой реактивный сервер, который принимает соединения TCP, и я столкнулся с интересным поведением RX, которое мне нужно объяснить и исправить. В этот момент сервер просто принимает соединения, считывает данные и записывает количество полученных данных в журнал. Единственная трудность для RX состоит в том, что после того, как клиент закрывает соединение, сервер должен быть повторно инициализирован (новый вызов accept()
). На самом деле это бесконечный цикл, который довольно сложно сделать в RX. Я могу использовать некоторые Subject
для построения фактического цикла из сигнала onComplete, но я слышал, что субъектов следует избегать, поэтому я попытался сгенерировать бесконечный Observable
, который естественным образом излучает новый импульс каждый раз, когда клиентское соединение завершается.
Это работает хорошо, но только до тех пор, пока весь сервер остается в главном потоке. Как только что-то в цепочке запускается в фоновом потоке, бесконечный Observable посылает поток импульсов для повторной инициализации сервера, см. Комментарии в коде. Это почему? Как мне разработать такой базовый сервер (не считая полностью RX)?
Observable
// initialize server address
.just(new InetSocketAddress(port))
// initialize server
.map(address -> AsynchronousServerSocketChannel.open().bind(address))
// infinite loop equivalent: generate infinite sequence of the same server instance
// as long as the downstream chain remains on the main thread it works as expected - server is reopened
// each time client closes the connection. If the downstream is however non-blocking, it generates flood of "Creating server"
// messages in log
.flatMap(server -> Observable
.generate((Emitter<AsynchronousServerSocketChannel> emitter) -> emitter.onNext(server)))
.doOnEach(notification -> LOG.info("Creating server"))
// repeat for every connection
.concatMap(
server -> Observable
// Wait for incoming connection
// This works because it is in fact blocking so it propably keeps the source infinite sequence observable cold
.fromFuture(server.accept())
// if it is however subscribed on background thread it results in flood of "Creating server" log messages
// generated by the infinite sequence observable
//.subscribeOn(Schedulers.io())
/*
// this has the same behavior as subscribeOn, because it's asynchronous by design (handler is called from outside)
.create((ObservableEmitter<AsynchronousSocketChannel> emitter) -> server.accept(server,
new CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>() {
@Override
public void completed(AsynchronousSocketChannel result,
AsynchronousServerSocketChannel server) {
emitter.onNext(result);
emitter.onComplete();
}
@Override
public void failed(Throwable exc, AsynchronousServerSocketChannel server) {
emitter.onError(exc);
}
}
))
*/
.doOnNext(notification -> LOG.info("Server created"))
// read incoming data and emit it's length
.flatMap(channel -> Observable.create((ObservableEmitter<Integer> emitter) ->
{
while (channel.isOpen()) {
try {
Integer result = channel.read(ByteBuffer.allocate(32)).get();
if (result == -1) {
channel.close();
} else {
emitter.onNext(result);
}
} catch (Exception e) {
emitter.onError(e);
}
}
emitter.onComplete();
}
))
.doOnComplete(() -> LOG.info("Server completed"))
.doOnError(error -> LOG.info("Server error " + error))
)
.forEach(readBytes -> LOG.info("Read bytes " + readBytes));