Я пытаюсь мультиплексировать соединение WebSocket, чтобы подписаться на несколько каналов, используя RxJS WebSocketSubject. Проблема в том, что я пытаюсь соответствовать протоколу Bayeux, и нет четкого способа отправить дополнительные данные о рукопожатии / подключении до того, как WebSocketSubject отправит сообщение подписки. Я взломал наблюдателя для отправки дополнительных данных, но мне нужен способ получить ответ от сервера, чтобы я мог использовать свою возвращенную клиентку в сообщениях подписки. Для этого я взломал метод map, чтобы выполнить некоторую работу, прежде чем он вернет следующее действие, но именно в этот момент я решил, что должен быть лучший способ. Вот мой код:
import { webSocket } from 'rxjs/webSocket';
import { filter, mergeMap, map } from 'rxjs/operators'
var assignedClientID = '';
const socket$ = webSocket({
url: 'wss://sn-62092.local/bayeux/websocket',
openObserver: {
next: (e) => {
e.target.send(JSON.stringify([{
channel: '/meta/handshake',
version: '1.0',
supportedConnectionTypes: ['websocket']
}]));
e.target.send(JSON.stringify([{
channel: '/meta/connect',
clientId: assignedClientID,
connectionType: 'websocket'
}]));
}
},
});
export const bayeuxWebsocket = (action$, store) =>
action$.pipe(
filter(action => action.type === 'BAYEUX_SUBSCRIBE'),
mergeMap(action =>
socket$.multiplex(
() => ([{
channel: '/meta/subscribe',
clientId: assignedClientID,
subscription: action.subscription,
}]),
() => ([{
channel: '/meta/unsubscribe',
clientId: assignedClientID,
subscription: action.subscription,
}]),
message => message
).pipe(
map(data => {
switch (data.channel) {
case "/meta/handshake":
assignedClientID = data.clientId;
break;
default:
break;
}
return {type: 'NEXT_ACTION'}
})
)
)
);