RXJS6 Проблема подписки на субъект веб-сокета с Redux Observables - PullRequest
0 голосов
/ 26 сентября 2018

Я пытаюсь установить соединение wensocket по требованию в своем приложении React-Redux.RxJS предоставляет webscoketSubject для управления веб-корзинами.При получении действия я хочу установить соединение с помощью webscoketSubject, для последующего запуска запроса webscokets я бы использовал сокет, если соединение уже установлено.Но когда я это делаю, создается новая подписка в ссылке на webscoketSubject, которую я храню в памяти, и поэтому при получении сообщения от сервера я получаю повторяющиеся действия.

Вот фрагмент кодакод.

import { ofType } from 'redux-observable';
import { delay, mapTo, switchMap, mergeMap, map, skipUntil, skipWhile, takeUntil, catchError, take } from 'rxjs/operators';
import { Observable, DataEvent, WebSocket, of } from 'rxjs';
import { WebSocketSubject } from 'rxjs/webSocket';

let socket;

const getWebSocket = () => {
    console.log("socket", socket);
    if(socket) {
        console.log(socket.closed);
    }
    if(!socket) {
        console.log("creating new socket");
        socket = new WebSocketSubject("ws://localhost:9091/web-socket");
    }
    return socket;
}

//maps a websocket recieved message to redux store types
const websocketMsgMapper = payload => {
    return {
        type : 'WORKFLOWS_RECEIVED',
        workflows : payload.workflows
    }
};


export const openSocketEpic = action$ => action$.pipe(
    ofType('START_WEBSOCKET'),
    mergeMap( action => {
        const socket$ = getWebSocket();
        console.log(socket$);
        return socket$.pipe(            
            map(websocketMsgMapper),
            takeUntil(action$.pipe(ofType('STOP_WEBSOCKET'))),
            catchError(error => of({ 
                    type : 'WORKFLOWS_RECEIVED_ERROR'
                }))
        );
    })
    );
...