Сервер не получает данные в случае RSocket Fire и Forget - PullRequest
0 голосов
/ 22 октября 2019

У меня есть следующий сервер RSocket

@Log4j2
public class NativeRsocketServerFnF {
  public static void main(String[] args) {
    RSocketFactory.receive()
        .frameDecoder(ZERO_COPY)
        .errorConsumer(log::error)
        .acceptor((setup, clientHandleRsocket) -> {
          return Mono.just(
              new AbstractRSocket() {
                @Override
                public Mono<Void> fireAndForget(Payload payload) {
                  CharSequence message = payload.data().readCharSequence(payload.data().readableBytes(), forName("UTF-8"));
                  payload.release();
                  log.info("> from client: {}", message);
                  return Mono.empty();
                }
              }
          );
        })
        .transport(TcpServerTransport.create(8000))
        .start()
        .block()
        .onClose()
        .block();
  }
}

и следующий клиент RSocket

@Log4j2
public class NativeRsocketClientFnF {
  public static void main(String[] args) {
    RSocketFactory.connect()
        .frameDecoder(ZERO_COPY)
        .errorConsumer(log::error)
        .transport(TcpClientTransport.create(8000))
        .start()
        .flatMap(rSocket -> rSocket.fireAndForget(DefaultPayload.create("ping")))
        .block();
  }
}

Как вы видите, я пытаюсь отправить "ping" в качестве данных полезной нагрузки от клиента кserver

Когда я запускаю сервер и запускаю клиент в первый раз, я вижу > from client: ping

Если я перезапускаю клиент снова, я не вижу никаких сообщений на сервере. Точка останова даже не попадает на сервер

Насколько я понимаю, Fire и Forget просто отправляют данные и не удосуживаются ждать и посмотреть, успешно ли сервер обрабатывает данные, но в моем случаеСам сервер не получает данные о последующих запусках клиента (как новых клиентов)

Есть ли что-то, чего мне не хватает?

Я использую версию 1.0.0-RC5 из rsocket-core &rsocket-transport-netty

ОС: Ubuntu 16.04

1 Ответ

1 голос
/ 23 октября 2019

Ваше понимание шаблона Fire and Forget правильное.

Проблема в том, io.rsocket.RSocketRequester#fireAndForget успешно завершена, прежде чем данные были отправлены в сеть. А когда вы блокируете его и выходите из клиентского приложения, соединение закрывается.

См. io.rsocket.RSocketRequester#handleFireAndForget

    return emptyUnicastMono()
        .doOnSubscribe(
            new OnceConsumer<Subscription>() {
              @Override
              public void acceptOnce(@Nonnull Subscription subscription) {
                ByteBuf requestFrame =
                    RequestFireAndForgetFrameFlyweight.encode(
                        allocator,
                        streamId,
                        false,
                        payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
                        payload.sliceData().retain());
                payload.release();

                sendProcessor.onNext(requestFrame);
              }
            });

sendProcessor - процессор с промежуточным потоком. Он используется фактическим соединением, и отправка кадра запроса в сеть может быть отложена.

Вы можете добавить .then(Mono.delay(Duration.ofMillis(100))) до того, как .block() вызовет на стороне клиента в целях тестирования. Я думаю, что большинство реальных приложений не выйдут сразу после вызова fireAndForget.

...