Как обработать сообщение, отправленное с сервера клиенту с помощью RSocket? - PullRequest
0 голосов
/ 23 апреля 2020

Я пытаюсь использовать RSocketRequester для отправки сообщения с сервера на указанный c клиент, но я не знаю, как его обработать на внешнем интерфейсе. Сервер Spring Webflux с таким контроллером:

data class Message(val message: String)

@Controller
class RSocketController {

    private val log = LoggerFactory.getLogger(RSocketController::class.java)

    @MessageMapping("say.hello")
    fun sayHello(message: String): Flux<Message> {
        log.info("say hello {}", message)
        return Flux.just(Message("server says hello"))
    }

    @MessageMapping("say.hi")
    fun sayHi(message: String, rSocketRequester: RSocketRequester): Flux<Message> {
        log.info("say hi {}", message)
        rSocketRequester
                .route("say.hello")
                .data(Message("server says hi hello ;)"))
                .send()
                .subscribe()
        return Flux.just(Message("server says hi!!"))
    }

}

На внешнем интерфейсе я использую rsocket-js. Метод sayHello прекрасно работает (поток запросов), но когда я вызываю метод sayHi, я хочу отправить два сообщения с сервера. Первый до say.hello конечной точки, а второй до say.hi конечной точки. У меня есть rsocket-js реализация, подобная этой:

    sayHello() {
      console.log("say hello");
      this.requestStream("say.hello");
    },
    sayHi() {
      console.log("say hi");
      this.requestStream("say.hi");
    },
    connect() {
      const transport = new RSocketWebSocketClient({
        url: "ws://localhost:8080/rsocket"
      });
      const client = new RSocketClient({        
        serializers: {
          data: JsonSerializer,
          metadata: IdentitySerializer
        },
        setup: {                  
          keepAlive: 60000,          
          lifetime: 180000,
          dataMimeType: "application/json",
          metadataMimeType: "message/x.rsocket.routing.v0"
        },
        transport
      });
      client.connect().subscribe({
        onComplete: socket => {
          this.socket = socket;
          console.log("complete connection");          
        },
        onError: error => {
          console.log("got connection error");
          console.error(error);
        },
        onSubscribe: cancel => {
          console.log("subscribe connection");
          console.log(cancel);          
        }        
      });
    },    
    requestStream(url) {
      if (this.socket) {
        this.socket
          .requestStream({
            data: url + " from client",
            metadata: String.fromCharCode(url.length) + url
          })
          .subscribe({
            onComplete: () => console.log("requestStream done"),
            onError: error => {
              console.log("got error with requestStream");
              console.error(error);
            },
            onNext: value => {
              // console.log("got next value in requestStream..");
              console.log("got data from sever");
              console.log(value.data);
            },
            // Nothing happens until `request(n)` is called
            onSubscribe: sub => {
              console.log("subscribe request Stream!");
              sub.request(2147483647);
              // sub.request(3);
            }
          });
      } else {
        console.log("not connected...");
      }
    }

Я вижу оба сообщения в Google Chrome DevTools -> Network -> rsocket. Клиент получает их, но я не могу уловить код, отправленный RSocketRequester. Похоже, что сервер использует fireAndForget метод. Как справиться с этим на стороне клиента?

1 Ответ

0 голосов
/ 24 апреля 2020

Как сказал @VladMamaev, мы можем предоставить клиенту ответчик, как в этом примере https://github.com/rsocket/rsocket-js/blob/master/packages/rsocket-examples/src/LeaseClientExample.js#L104

Для меня достаточно метода fireAndForget.

export class EchoResponder {
  constructor(callback) {
    this.callback = callback;
  }
  fireAndForget(payload) {  
    this.callback(payload);
  }  
}

import { EchoResponder } from "~/assets/EchoResponder";
...

  const messageReceiver = payload => {
    //do what you want to do with received message  
    console.log(payload)    
  };
  const responder = new EchoResponder(messageReceiver);


  connect() {
  const transport = new RSocketWebSocketClient({
    url: "ws://localhost:8080/rsocket"
  });
  const client = new RSocketClient({        
    serializers: {
      data: JsonSerializer,
      metadata: IdentitySerializer
    },
    setup: {                  
      keepAlive: 60000,          
      lifetime: 180000,
      dataMimeType: "application/json",
      metadataMimeType: "message/x.rsocket.routing.v0"
    },
    responder: responder,
    transport
  });

...