Хорошо, я новичок в RSocket. Я пытаюсь создать простой клиент RSocket и простой сервер RSocket. Из проведенного мною исследования говорится, что RSocket поддерживает возобновление:
Это особенно полезно, поскольку при отправке кадра RESUME, содержащего информацию о последнем принятом кадре, клиент может возобновить соединение и только запросить данные, которые он еще не получил, избегая ненужной загрузки на сервер и тратя время на попытки извлечь данные, которые уже были извлечены.
Также говорится, что клиент является ответственным за включение возобновления. У меня вопрос, как включить это возобновление и как отправить этот кадр RESUME. У меня есть работающий клиент и сервер, но если я выключаю сервер и запускаю его снова, ничего не происходит, а позже, когда клиент пытается снова связаться с сервером, он выдает: java .nio.channels.ClosedChannelException.
Это моя конфигурация клиента:
@Configuration
public class ClientConfiguration {
/**
* Defining the RSocket client to use tcp transport on port 7000
*/
@Bean
public RSocket rSocket() {
return RSocketFactory
.connect()
.resumeSessionDuration(Duration.ofDays(10))
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
/**
* RSocketRequester bean which is a wrapper around RSocket
* and it is used to communicate with the RSocket server
*/
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
И это RestController, с которого я начинаю связь с сервером rsocket:
@RestController
public class UserDataRestController {
private final RSocketRequester rSocketRequester;
public UserDataRestController(RSocketRequester.Builder rSocketRequester) {
this.rSocketRequester = rSocketRequester.connectTcp("localhost", 7000).block();
}
@GetMapping(value = "/feed/{firstName}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<Person> feed(@PathVariable("firstName") String firstName) {
return rSocketRequester
.route("feedPersonData")
.data(new PersonDataRequest(firstName))
.retrieveFlux(Person.class);
}
}