Пожалуйста!Используйте RSocket !
Это абсолютно правильный дизайн, и он стоит сэкономить ресурсы и использовать только соединение для каждого клиента для всех возможных операций.
Однако, нереализовать колесо и использовать протокол, который дает вам все эти виды связи.
- RSocket имеет модель запрос-ответ , которая позволяет вам выполнять самые распространенные на сегодняшний день взаимодействия клиент-сервер.
- RSocket имеет модель потока запросов , поэтому вы можете выполнить все свои потребности и вернуть поток событий, асинхронно используя одно и то же соединение.RSocket выполняет всю привязку логического потока к физическому соединению и обратно, поэтому вы не почувствуете боли, делая это самостоятельно.
- RSocket имеет гораздо больше моделей взаимодействия, таких как fire-and-Forgot и stream-stream , которые могут быть полезны в случае отправки потока данных обоими способами.
Как использовать RSocket в Spring
Один из вариантов сделать это - использовать RSocket-Java реализацию протокола RSocket.RSocket-Java построен поверх Project Reactor, поэтому он естественным образом соответствует экосистеме Spring WebFlux.
К сожалению, нет особой интеграции с экосистемой Spring.К счастью, я потратил пару часов на создание простого RSocket Spring Boot Starter , который интегрирует Spring WebFlux с RSocket и предоставляет сервер WebSocket RSocket вместе с сервером WebFlux Http.
Почему RSocket лучшеподход?
По сути, RSocket скрывает сложность реализации того же самого подхода самостоятельно.С RSocket нам не нужно заботиться об определении модели взаимодействия в качестве пользовательского протокола и реализации в Java.RSocket осуществляет доставку данных по определенному логическому каналу.Он предоставляет встроенный клиент, который отправляет сообщения на одно и то же соединение WS, поэтому нам не нужно изобретать для этого специальную реализацию.
Так как RSocket просто протокол, он не предоставляет никакого формата сообщений, поэтому эта задача для бизнес-логики.Однако существует проект RSocket-RPC, который предоставляет буфер протокола в качестве формата сообщения и использует ту же технику генерации кода, что и GRPC.Таким образом, используя RSocket-RPC, мы можем легко создать API для клиента и сервера, не обращая внимания на абстракцию транспорта и протоколов.
Та же самая интеграция RSocket Spring Boot обеспечивает пример RSocket-Использование RPC также.
Хорошо, это не убедило меня, я хочу иметь собственный сервер WebSocket по-прежнему
Так что для этой цели вы должны реализовать этот ад самостоятельно.Я уже делал это однажды, но не могу указать на этот проект, так как это корпоративный проект.Тем не менее, я могу поделиться несколькими примерами кода, которые могут помочь вам в создании правильного клиента и сервера.
На стороне сервера
Отображение обработчика и открытых логических подписчиков
ПервоеНеобходимо учитывать, что все логические потоки в пределах одного физического соединения должны храниться где-то:
class MyWebSocketRouter implements WebSocketHandler {
final Map<String, EnumMap<ActionMessage.Type, ChannelHandler>> channelsMapping;
@Override
public Mono<Void> handle(WebSocketSession session) {
final Map<String, Disposable> channelsIdsToDisposableMap = new HashMap<>();
...
}
}
В приведенном выше примере есть две карты.Первый - это ваше отображение маршрутов, которое позволяет вам определять маршрут на основе параметров входящего сообщения или около того.Второй создан для сценария использования потоков запросов (в моем случае это была карта активных подписок), так что вы можете отправить фрейм сообщения, который создает подписку, или подписать вас на определенное действие и сохранить эту подписку, чтобы отменить подписку.действие выполнено, и вы отмените подписку, если подписка существует.
Использовать процессор для мультиплексирования сообщений
Для отправки сообщений из всех логических потоков необходимо мультиплексировать сообщения в один поток.Например, используя Reactor, вы можете сделать это, используя UnicastProcessor
:
@Override
public Mono<Void> handle(WebSocketSession session) {
final UnicastProcessor<ResponseMessage<?>> funIn = UnicastProcessor.create(Queues.<ResponseMessage<?>>unboundedMultiproducer().get());
...
return Mono
.subscriberContext()
.flatMap(context -> Flux.merge(
session
.receive()
...
.cast(ActionMessage.class)
.publishOn(Schedulers.parallel())
.doOnNext(am -> {
switch (am.type) {
case CREATE:
case UPDATE:
case CANCEL: {
...
}
case SUBSCRIBE: {
Flux<ResponseMessage<?>> flux = Flux
.from(
channelsMapping.get(am.getChannelId())
.get(ActionMessage.Type.SUBSCRIBE)
.handle(am) // returns Publisher<>
);
if (flux != null) {
channelsIdsToDisposableMap.compute(
am.getChannelId() + am.getSymbol(), // you can generate a uniq uuid on the client side if needed
(cid, disposable) -> {
...
return flux
.subscriberContext(context)
.subscribe(
funIn::onNext, // send message to a Processor manually
e -> {
funIn.onNext(
new ResponseMessage<>( // send errors as a messages to Processor here
0,
e.getMessage(),
...
ResponseMessage.Type.ERROR
)
);
}
);
}
);
}
return;
}
case UNSABSCRIBE: {
Disposable disposable = channelsIdsToDisposableMap.get(am.getChannelId() + am.getSymbol());
if (disposable != null) {
disposable.dispose();
}
}
}
})
.then(Mono.empty()),
funIn
...
.map(p -> new WebSocketMessage(WebSocketMessage.Type.TEXT, p))
.as(session::send)
).then()
);
}
. Как видно из приведенного выше примера, здесь есть куча вещей:
- Сообщение должно содержать информацию о маршруте
- Сообщение должно содержать уникальный идентификатор потока, к которому оно относится.
- Отдельный процессор для мультиплексирования сообщений, где ошибкой также должно быть сообщение
- Каждый канал должен храниться где-то, в этом случае у нас есть простой случай использования, где каждое сообщение может содержать
Flux
сообщений или просто Mono
(в случае моно это может быть реализовано прощена стороне сервера, поэтому вам не нужно хранить уникальный идентификатор потока). - Этот образец не включает в себя кодирование-декодирование сообщений, поэтому этот вызов остается за вами.
Клиентская сторона
Клиент также не так прост:
Обработка сеанса
Для обработки соединения нам нужно выделить два процессора, чтобы далее мы могли использовать их для мультиплексирования иДемультиплексированные сообщения:
UnicastProcessor<> outgoing = ...
UnicastPorcessor<> incoming = ...
(session) -> {
return Flux.merge(
session.receive()
.subscribeWith(incoming)
.then(Mono.empty()),
session.send(outgoing)
).then();
}
Храните где-нибудь все логические потоки
Все созданные потоки, будь то Mono
или Flux
, должны храниться где-то, поэтому мыбудет в состоянии различить, к какому потоковому сообщению относится:
Map<String, MonoSink> monoSinksMap = ...;
Map<String, FluxSink> fluxSinksMap = ...;
мы должны сохранить две карты начиная с MonoSink, и FluxSink не имеет того же родительского интерфейса.
Маршрутизация сообщений
В приведенных выше примерах мы только что рассмотрели начальную часть клиентской части.Теперь нам нужно построить механизм маршрутизации сообщений:
...
.subscribeWith(incoming)
.doOnNext(message -> {
if (monoSinkMap.containsKey(message.getStreamId())) {
MonoSink sink = monoSinkMap.get(message.getStreamId());
monoSinkMap.remove(message.getStreamId());
if (message.getType() == SUCCESS) {
sink.success(message.getData());
}
else {
sink.error(message.getCause());
}
} else if (fluxSinkMap.containsKey(message.getStreamId())) {
FluxSink sink = fluxSinkMap.get(message.getStreamId());
if (message.getType() == NEXT) {
sink.next(message.getData());
}
else if (message.getType() == COMPLETE) {
fluxSinkMap.remove(message.getStreamId());
sink.next(message.getData());
sink.complete();
}
else {
fluxSinkMap.remove(message.getStreamId());
sink.error(message.getCause());
}
}
})
В приведенном выше примере кода показано, как мы можем маршрутизировать входящие сообщения.
Мультиплексные запросы
Последняя часть - сообщениямультиплексирование.Для этого мы рассмотрим возможный класс отправителя impl:
class Sender {
UnicastProcessor<> outgoing = ...
UnicastPorcessor<> incoming = ...
Map<String, MonoSink> monoSinksMap = ...;
Map<String, FluxSink> fluxSinksMap = ...;
public Sender () {
// создайте здесь соединение через веб-сокет и поместите код, упомянутый ранее}
Mono<R> sendForMono(T data) {
//generate message with unique
return Mono.<R>create(sink -> {
monoSinksMap.put(streamId, sink);
outgoing.onNext(message); // send message to server only when subscribed to Mono
});
}
Flux<R> sendForFlux(T data) {
return Flux.<R>create(sink -> {
fluxSinksMap.put(streamId, sink);
outgoing.onNext(message); // send message to server only when subscribed to Flux
});
}
}
Сумма реализации Custom
- Хардкор
- Не реализована поддержка противодавления, что может стать еще одной проблемой
- Легко выстрелить себе в ногу
Еда на вынос
- ПОЖАЛУЙСТА, используйте RSocket, не придумывайте протокол самостоятельно, это ТРУДНО !!!
- Чтобы узнать больше о RSocket от ребят из Pivotal - https://www.youtube.com/watch?v=WVnAbv65uCU
- Чтобы узнать большео RSocket из одного из моих выступлений - https://www.youtube.com/watch?v=XKMyj6arY2A
- На платформе RSocket построен специальный фреймворк под названием Proteus - вас это может заинтересовать - https://www.netifi.com/
- Чтобы узнать больше оProteus от разработчика ядра протокола RSocket - https://www.google.com/url?sa=t&source=web&rct=j&url=https://m.youtube.com/watch%3Fv%3D_rqQtkIeNIQ&ved=2ahUKEwjpyLTpsLzfAhXDDiwKHUUUA8gQt9IBMAR6BAgNEB8&usg=AOvVaw0B_VdOj42gjr0YrzLLUX1E