angular v 6.1.10
typcript v 2.9.2
rxjs v 6.3.3
ng2-stmompjs v 7.0.0
Я использую библиотеку ng2-stomp для веб-сокетов, которые создают наблюдаемый, инициирует подписку, которая является наблюдаемой.В моих требованиях я создаю несколько подписок на каналы на основе идентификатора приложения и теперь хочу подписать все эти каналы одновременно, или мы можем сказать, что наблюдаем более высокий порядок, поэтому попытался использовать различные операторы rxjs merge
, mergeAll
, concat
но пока ничего не работает.Вот что я сделал до сих пор.
Сейчас этот работает
appList = [{appID: '123'}, {appID: '345'}];
const appList$ = appList.map((appID: string, idx: number) => {
const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
console.log({ watcher }); // This is observable
return watcher;
});
appList$.forEach((app$) => {
app$.subscribe((message: Message) => {
const notification: Notification = JSON.parse(message.body);
this.totalNotificationCount++;
if (Object.keys(notification).length) {
this.notificationMessages.push(notification);
}
});
});
{
"watcher": { "_isScalar": false, "source": { "source": { "_isScalar": false } }, "operator": { "connectable": { "source": { "_isScalar": false } } } }
}
НО Я думаю, что мы можем объединить все наблюдаемые в одной и подписать все.Обратите внимание, что я не могу использовать ForkJoin
, потому что appList является динамическим и поэтому номер WebSocket.Ниже приведены мои рекомендации по преобразованию нескольких наблюдаемых в один раз.
Пробная версия 1: с использованием concat
и map
оператора
const batch = appList.map((appID, idx) => {
console.log({ appID, idx });
const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
return watcher;
});
concat(...batch).pipe( map (i => i)).subscribe({ });
это дает ошибку:
Свойство 'pipe' не существует для типа 'MonoTypeOperatorFunction'.
пробная версия 2: использовать подписку всего после concat
concat(...batch).subscribe({
next: (v: any) => console.log(v),
complete: () => console.log('Complete')
});
Ошибка: подписка свойства ''не существует для типа' MonoTypeOperatorFunction '.
Трейл 3: использование pipe
const appList$ = appList.map((appID: string, idx: number) => {
const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
return watcher;
});
console.log({ appList$ });
appList$.pipe(
takeUntil(this.ngUnsubscribe),
tap((i) => {
console.log('tapping', i);
})
);
console.log ({appList $}) возвращает это
{
"appList$": [
{
"_isScalar": false,
"source": {
"source": {
"_isScalar": false
}
},
"operator": {
"connectable": {
"source": {
"_isScalar": false
}
}
}
},
{
"_isScalar": false,
"source": {
"source": {
"_isScalar": false
}
},
"operator": {
"connectable": {
"source": {
"_isScalar": false
}
}
}
},
{
"_isScalar": false,
"source": {
"source": {
"_isScalar": false
}
},
"operator": {
"connectable": {
"source": {
"_isScalar": false
}
}
}
},
{
"_isScalar": false,
"source": {
"source": {
"_isScalar": false
}
},
"operator": {
"connectable": {
"source": {
"_isScalar": false
}
}
}
},
{
"_isScalar": false,
"source": {
"source": {
"_isScalar": false
}
},
"operator": {
"connectable": {
"source": {
"_isScalar": false
}
}
}
},
{
"_isScalar": false,
"source": {
"source": {
"_isScalar": false
}
},
"operator": {
"connectable": {
"source": {
"_isScalar": false
}
}
}
}
]
}
Ошибка: свойство 'pipe' не существует для типа 'Observable []'
Поэтому мой вопрос заключается в том, как объединить всенаблюдаемый в один раз и подписаться в один раз