Spring / Project Reactor TcpClient настройка исходящих / входящих с HTTP-запросом / ответом - PullRequest
0 голосов
/ 20 июня 2020

У меня есть REST API, использующий Spring RestController, который также должен связываться с внутренним сервером через TCP. Теперь, используя Project Reactor TcpClient , я пытаюсь определить эффективный способ отправки исходящего сообщения, а затем использовать входящее сообщение и вернуть его как ответ HTTP-клиента. Я предполагаю, что в некотором смысле обработка исходящих и входящих сообщений TcpClient аналогична HTTP-запросу и ответу WebClient. В основном, шаги следующие:

  1. Приходит HTTP-запрос POST, который будет обрабатываться RestController.
  2. Затем запрос будет преобразован в правильный формат и будет отправлено через TCP с использованием tcpClientConnection.outbound().sendByteArray(formattedRequest).subscribe().
  3. Подписчик будет прослушивать входящий ответ TcpClient через tcpClientConnection.inbound().receive().asByteArray().subscribe(bytes-> someImplementation).
  4. Значение, возвращенное этим входящим процессом, будет затем возвращено в качестве ответа на контроллер.

Также важно отметить, что соединение TcpClient является постоянным.

Ниже приведены примеры упрощенных фрагментов кода:

Контроллер

@RestController
@RequestMapping("/client")
public class ClientController {

  private final SampleTcpClient sampleTcpClient;

  @Inject
  public ClientController(SampleTcpClient sampleTcpClient) {
    this.sampleTcpClient = sampleTcpClient;
  }

  @PostMapping
  Mono<ResponseEntity<Obj>> postValue(@RequestBody ClientRequest request) {
     return this.sampleTcpClient
                .sendRequest(request.getRequestAttr()) // say, a simple string for simplicity.
                .map(str -> mapToObjResponse);

  }
}

TCP-клиент

import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;


@Component
public class SampleTcpClient {

  private Connection clientConn;

  private DirectProcessor<byte[]> directProcessor;

  private FluxSink<byte[]> fluxSink;

  // Constructor
  public SampleTcpClient() {
      this.msgProcessor = DirectProcessor.create();
      this.fluxSink = directProcessor.sink();
      this.clientConn = TcpClient.create()
                .host("serverHostIPHere")
                .port(1234)
                .handle((inbound, outbound) -> {
                    // setup inbound here.
                    inbound.receive()
                           .asByteArray()
                           .onBackpressureBuffer()
                           .subscribe(bytes -> this.fluxSink.next(bytes));

                    return Mono.never();
                });
  }

  public Mono<String> sendRequest(String requestVal) {
      return Mono.create(sink -> {
          // A sequence number to track which response message goes 
          // to which request message.
          final long sequenceNo = generateSequenceNo();

          // setup DirectProcessor subscriber here.
          final Disposable sub = this.directProcessor
                    .subscribe(bytes -> {
                        if (getSequenceNo(bytes) == sequenceNo) {
                            sink.success(parseBytesToResponseString(bytes));
                        }
                    });

           // dispose the subscriber.
           sink.onDispose(() -> sub.dispose());

          this.clientConn
              .outbound()
              .sendByteArray(Mono.just(requestVal.getBytes())) // imagine the sequenceNo is included in the request.
              .subscribe();
      });

  }

}

Проблемы, связанные с приведенным выше кодом:

  1. inbound.receive() можно выполнить только один раз. В противном случае будет выдана ошибка java.lang.IllegalStateException: Only one connection receive subscriber allowed.
  2. , что заставило меня поэкспериментировать с FluxProcessor, например DirectProcessor, как показано в примере кода. Однако, согласно Project Reactor Processor , здесь упоминается, что мы должны стараться избегать использования процессоров, если это возможно.
  3. Я не удовлетворен тем, как у меня есть подписчик на DirectProcessor, который я затем избавиться после. Не уверен, что это правильный способ.

Приведенный выше код работает, но мне кажется, что я делаю что-то неуместное, и есть более простой и эффективный способ сделать то, что я пытаюсь достичь. Любая помощь будет принята с благодарностью!

...