RSocket возобновить сеанс / поток после потери соединения - PullRequest
1 голос
/ 29 января 2020

Я использую канал RSocket и пытаюсь реализовать функцию возобновления на случай потери соединения с сервером. После попытки использования различных комбинаций resume (), resumeSessionDuration (), resumeStreamTimeout () и resumeCleanupOnKeepAlive () / documents: https://javadoc.io/static/io.rsocket/rsocket-core/1.0.0-RC3/io/rsocket/RSocketFactory.ServerRSocketFactory.html#resume - / на сервере и мобильном устройстве, все равно ничего не происходит. На мобильном устройстве я отправляю данные на сервер каждые 10 секунд, а когда соединение теряется, сокет располагается на 3-й повторной попытке. Кто-нибудь знает, что именно эти методы делают на стороне сервера и на стороне клиента? Документации практически нет ...

Это код на сервере:

RSocketFactory.receive()
                    .resume()
                    .acceptor((setupPayload, reactiveSocket) -> Mono.just(responseHandler))
                    .transport(TcpServerTransport.create(buildSecuredTcpServer(address, port, 
                    sslContext)))
                    .start()
                    .block();

Код на мобильном устройстве:

 socket = RSocketFactory.connect()
        .errorConsumer(throwable -> {
            if (throwable instanceof RejectedResumeException){
                Log.d("tagg", "error: " + throwable.getMessage());
            }
        })
        .resume()
        .resumeStreamTimeout(Duration.ofSeconds(120))
        .resumeSessionDuration(Duration.ofSeconds(600))
        .keepAlive(Duration.ofSeconds(120), Duration.ofSeconds(120), 120)
        .resumeStrategy(() -> new PeriodicResumeStrategy(Duration.ofSeconds(1)))
        .metadataMimeType(BuildConfig.RSOCKET_METADATA_MIME_TYPE)
        .dataMimeType(BuildConfig.RSOCKET_DATA_MIME_TYPE)
        .transport(TcpClientTransport.create(tcpClient))
        .start()
        .block();
Objects.requireNonNull(socket)
        .requestChannel(Flux.from(this::onSubscribe))
        .doOnNext(this::receiveData)
        .retryBackoff(Integer.MAX_VALUE, Duration.ofSeconds(1))
        .subscribe();
...