Я собираю UDP-сервер в Spring Boot 2.1.3 с Webflux 2.1.8, который собирает данные от UDP-клиентов.
Я хотел, чтобы он был построен полностью реактивно, но у меня возникли проблемы с пулами потоков.В настоящее время мое приложение работает только в одном потоке, что не соответствует ожиданиям.Во-первых, я создал простой Python-клиент, который ожидает ответа для проверки моего сервера
# Create a UDP socket at client side
UDPClientSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
# Send to server using created UDP socket
UDPClientSocket.sendto(bytesToSend, serverAddressPort)
print("Receiving...")
msgFromServer = UDPClientSocket.recvfrom(bufferSize)
msg = "Message from Server {}".format(msgFromServer[0])
print(msg)
Теперь мой Spring-загрузочный UDP-сервер выглядит следующим образом:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
CommandLineRunner serverRunner(UdpDecoderHandler udpDecoderHanlder, UdpEncoderHandler udpEncoderHandler, UdpHandler udpHandler) {
return strings -> {
createUdpServer(udpDecoderHanlder, udpEncoderHandler, udpHandler);
};
}
private void createUdpServer(UdpDecoderHandler udpDecoderHandler, UdpEncoderHandler udpEncoderHandler, UdpHandler udpHandler) {
UdpServer.create()
.handle((in,out) -> {
in.receive().asByteArray().subscribe();
return Flux.never();
})
.port(19001)
.doOnBound(conn -> conn
.addHandler("decoder", udpDecoderHandler)
.addHandler("encoder", udpEncoderHandler)
.addHandler("handler", udpHandler)
)
.bindNow(Duration.ofSeconds(30));
}
}
Декодер:
@Service
public class UdpDecoderHandler extends MessageToMessageDecoder<DatagramPacket> {
private static final Logger LOGGER = LoggerFactory.getLogger(UdpDecoderHandler.class);
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List<Object> out) {
ByteBuf byteBuf = datagramPacket.content();
byte[] data = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(data);
String msg = new String(data);
LOGGER.info("Decoded data: " + msg);
InetSocketAddress s = datagramPacket.sender();
channelHandlerContext.fireChannelRead(s);
}
}
Кодировщик:
@Service
public class UdpEncoderHandler extends MessageToMessageEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object o, List list) throws Exception {
InetSocketAddress socketAddress = (InetSocketAddress) o;
log.debug("Encode function...");
String msg = "Hello response!";
DatagramPacket response = new DatagramPacket(Unpooled.copiedBuffer(msg.getBytes()), socketAddress);
list.add(response);
}
}
Обработчик (здесь я добавил функцию сна для проверки моего совпадения):
@Service
public class UdpHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("Channel Read function");
InetSocketAddress message = (InetSocketAddress) msg;
Thread.sleep(5000);
ctx.channel().writeAndFlush(message).addListener((GenericFutureListener<Future<Void>>) future -> {
if(future.isDone() && future.isSuccess()) {
log.info("OK");
} else {
log.error("error " + future.isDone() + " - " + future.isSuccess());
if(!future.isSuccess()) {
future.cause().printStackTrace();
}
}
});
}
}
Имея такое, я запускаю свойСценарии Python сразу:
python3 udp-client.py & python3 udp-client.py
Появятся два сообщения с разницей в 5 с, что означает, что мое приложение работает в одном потоке.
Как настроить netty и webflux для работы с 4 потоками?