У меня есть REST API, использующий Spring RestController, который также должен связываться с внутренним сервером через TCP. Теперь, используя Project Reactor TcpClient , я пытаюсь определить эффективный способ отправки исходящего сообщения, а затем использовать входящее сообщение и вернуть его как ответ HTTP-клиента. Я предполагаю, что в некотором смысле обработка исходящих и входящих сообщений TcpClient аналогична HTTP-запросу и ответу WebClient. В основном, шаги следующие:
- Приходит HTTP-запрос POST, который будет обрабатываться RestController.
- Затем запрос будет преобразован в правильный формат и будет отправлено через TCP с использованием
tcpClientConnection.outbound().sendByteArray(formattedRequest).subscribe()
. - Подписчик будет прослушивать входящий ответ TcpClient через
tcpClientConnection.inbound().receive().asByteArray().subscribe(bytes-> someImplementation)
. - Значение, возвращенное этим входящим процессом, будет затем возвращено в качестве ответа на контроллер.
Также важно отметить, что соединение TcpClient является постоянным.
Ниже приведены примеры упрощенных фрагментов кода:
Контроллер
@RestController
@RequestMapping("/client")
public class ClientController {
private final SampleTcpClient sampleTcpClient;
@Inject
public ClientController(SampleTcpClient sampleTcpClient) {
this.sampleTcpClient = sampleTcpClient;
}
@PostMapping
Mono<ResponseEntity<Obj>> postValue(@RequestBody ClientRequest request) {
return this.sampleTcpClient
.sendRequest(request.getRequestAttr()) // say, a simple string for simplicity.
.map(str -> mapToObjResponse);
}
}
TCP-клиент
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
@Component
public class SampleTcpClient {
private Connection clientConn;
private DirectProcessor<byte[]> directProcessor;
private FluxSink<byte[]> fluxSink;
// Constructor
public SampleTcpClient() {
this.msgProcessor = DirectProcessor.create();
this.fluxSink = directProcessor.sink();
this.clientConn = TcpClient.create()
.host("serverHostIPHere")
.port(1234)
.handle((inbound, outbound) -> {
// setup inbound here.
inbound.receive()
.asByteArray()
.onBackpressureBuffer()
.subscribe(bytes -> this.fluxSink.next(bytes));
return Mono.never();
});
}
public Mono<String> sendRequest(String requestVal) {
return Mono.create(sink -> {
// A sequence number to track which response message goes
// to which request message.
final long sequenceNo = generateSequenceNo();
// setup DirectProcessor subscriber here.
final Disposable sub = this.directProcessor
.subscribe(bytes -> {
if (getSequenceNo(bytes) == sequenceNo) {
sink.success(parseBytesToResponseString(bytes));
}
});
// dispose the subscriber.
sink.onDispose(() -> sub.dispose());
this.clientConn
.outbound()
.sendByteArray(Mono.just(requestVal.getBytes())) // imagine the sequenceNo is included in the request.
.subscribe();
});
}
}
Проблемы, связанные с приведенным выше кодом:
inbound.receive()
можно выполнить только один раз. В противном случае будет выдана ошибка java.lang.IllegalStateException: Only one connection receive subscriber allowed
. - , что заставило меня поэкспериментировать с
FluxProcessor
, например DirectProcessor
, как показано в примере кода. Однако, согласно Project Reactor Processor , здесь упоминается, что мы должны стараться избегать использования процессоров, если это возможно. - Я не удовлетворен тем, как у меня есть подписчик на DirectProcessor, который я затем избавиться после. Не уверен, что это правильный способ.
Приведенный выше код работает, но мне кажется, что я делаю что-то неуместное, и есть более простой и эффективный способ сделать то, что я пытаюсь достичь. Любая помощь будет принята с благодарностью!