Я пытаюсь отправить сообщения от службы. net службе sping boot по протоколу RSocket, соединение установлено, но сообщения не достигают пункта назначения. В обоих случаях. net сервер - java клиент и java сервер -. net клиент. Возможно, кто-то уже установил рабочую конфигурацию (. net - весенняя загрузка через rsocket)
Рабочая конфигурация для строковых сообщений (java сервер -. net клиент)
import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.WebsocketDuplexConnection;
import io.rsocket.util.DefaultPayload;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
ServerTransport.ConnectionAcceptor acceptor =
RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(new SocketAcceptorImpl())
.toConnectionAcceptor();
DisposableServer disposableServer =
HttpServer.create()
.host("localhost")
.port(serverPort)
.route(
routes ->
routes.ws(
"/",
(in, out) -> {
DuplexConnection connection = new WebsocketDuplexConnection((Connection) in);
return acceptor.apply(connection).then(out.neverComplete());
}))
.bindNow();
class SocketAcceptorImpl implements SocketAcceptor {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {
return Mono.just(
new AbstractRSocket() {
@Override
public Mono<Void> fireAndForget(Payload payload) {
payload.release();
return Mono.empty();
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
return Mono.just(payload);
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads).subscribeOn(Schedulers.single());
}
@Override
public Flux<Payload> requestStream(Payload payload) {
String dataUtf8 = payload.getDataUtf8();
return Flux.just(DefaultPayload.create("!!! " + dataUtf8 + " !!!"));
}
});
}
}