rxjs websocket - потеря соединения с интернетом - PullRequest
0 голосов
/ 24 октября 2018

Я не могу найти способ повторного выполнения конвейера, если клиент потерял соединение с классом веб-сокета rxjs.

this.cexSocket = webSocket({ url: 'wss://ws.cex.io/ws/', WebSocketCtor: websocketModule.default });

    this.socket = this.cexSocket
        .pipe(retryWhen(errors => {
            delay(1000);
            return errors;
        }))
        .pipe(repeatWhen(complete => {
            delay(1000);
            return complete;
        }))
        .pipe(groupBy(({ e }) => e));

    this.socket.subscribe(this.addEvents.bind(this));

    setTimeout(this.checkTicks.bind(this), 5000);

Если клиент не получает отметки с сервера, он должен закрыть соединение и пройти аутентификацию.снова.Функция проверки тиков:

static checkTicks () {
    if (!this.lastTick)
        return setTimeout(this.checkTicks.bind(this), 5000);
    if (this.lastTick + 10000 > Date.now())
        return setTimeout(this.checkTicks.bind(this), 5000);

    this.lastTick = null;
    //??? this.socket.error() does not work as expected
}

1 Ответ

0 голосов
/ 24 октября 2018

Мне удалось найти решение, принудительно закрыв сокет, если больше тиков не было найдено:

static checkTicks () {
    if (!this.lastTick)
        return setTimeout(this.checkTicks.bind(this), 5000);
    if (this.lastTick + 10000 > Date.now())
        return setTimeout(this.checkTicks.bind(this), 5000);

    this.lastTick = null;
    this.socket.unsubscribe();
    /* eslint no-underscore-dangle: 0 */
    if (!this.cexSocket._socket)
        this.cexSocket._connectSocket();
    this.cexSocket._socket.close();
    this.socket.subscribe(this.addEvents.bind(this));
    setTimeout(this.checkTicks.bind(this), 5000);
}

Мне также пришлось отписаться, чтобы снова запустить аутентификацию.Вот моя функция добавления событий на случай, если кому-то интересно узнать о потоке:

static addEvents (eventGroup) {
    ({
        disconnecting: grp => grp.subscribe(msg => console.warn(msg)),
        connected: grp => grp.subscribe(() => {
            this.authenticate();
        }),
        ping: grp => grp.subscribe(() => this.send({ e: 'pong' })),
        auth: () => this.registerTicker(),
        tick: grp => {
            this.ticker = grp
                .pipe(
                    map(
                        ({ 
                            data: { 
                                symbol1: base, 
                                symbol2: quote, 
                                price
                            }
                        }) => ({
                            base,
                            quote,
                            price
                        }))
                )
                .pipe(share());
            grp.subscribe(() => {
                this.lastTick = Date.now();
            });
        }
    })[eventGroup.key](eventGroup);
}

Обновление :

Я переписал все это.Теперь я просто пытаюсь отправить на сервер, и повторная попытка перехватит его, если соединение потеряно:

export const getTicker = () => {
const ticker = cexSocket
    .pipe(retryWhen(errors => {
        delay(1000);
        return errors;
    }))
    .pipe(repeatWhen(complete => {
        delay(1000);
        return complete;
    }))
    .pipe(groupBy(({ e }) => e))
    .pipe(flatMap(grp => {
        const o = addEvents(grp);
        if (!o)
            return Observable.create();
        return o;
    }));

//This is to make sure the server is still there
const check = interval(15000).pipe(switchMap(() => {
    cexSocket.next(JSON.stringify({ e: 'ping' }));
    return ticker;
}));

return check;
};
...