Мне удалось найти решение, принудительно закрыв сокет, если больше тиков не было найдено:
static checkTicks () {
if (!this.lastTick)
return setTimeout(this.checkTicks.bind(this), 5000);
if (this.lastTick + 10000 > Date.now())
return setTimeout(this.checkTicks.bind(this), 5000);
this.lastTick = null;
this.socket.unsubscribe();
/* eslint no-underscore-dangle: 0 */
if (!this.cexSocket._socket)
this.cexSocket._connectSocket();
this.cexSocket._socket.close();
this.socket.subscribe(this.addEvents.bind(this));
setTimeout(this.checkTicks.bind(this), 5000);
}
Мне также пришлось отписаться, чтобы снова запустить аутентификацию.Вот моя функция добавления событий на случай, если кому-то интересно узнать о потоке:
static addEvents (eventGroup) {
({
disconnecting: grp => grp.subscribe(msg => console.warn(msg)),
connected: grp => grp.subscribe(() => {
this.authenticate();
}),
ping: grp => grp.subscribe(() => this.send({ e: 'pong' })),
auth: () => this.registerTicker(),
tick: grp => {
this.ticker = grp
.pipe(
map(
({
data: {
symbol1: base,
symbol2: quote,
price
}
}) => ({
base,
quote,
price
}))
)
.pipe(share());
grp.subscribe(() => {
this.lastTick = Date.now();
});
}
})[eventGroup.key](eventGroup);
}
Обновление :
Я переписал все это.Теперь я просто пытаюсь отправить на сервер, и повторная попытка перехватит его, если соединение потеряно:
export const getTicker = () => {
const ticker = cexSocket
.pipe(retryWhen(errors => {
delay(1000);
return errors;
}))
.pipe(repeatWhen(complete => {
delay(1000);
return complete;
}))
.pipe(groupBy(({ e }) => e))
.pipe(flatMap(grp => {
const o = addEvents(grp);
if (!o)
return Observable.create();
return o;
}));
//This is to make sure the server is still there
const check = interval(15000).pipe(switchMap(() => {
cexSocket.next(JSON.stringify({ e: 'ping' }));
return ticker;
}));
return check;
};