Как объединить массив наблюдаемых в одну.Ошибка выброса. Свойство pipe не существует для типа Observable <Message>[]. - PullRequest
0 голосов
/ 21 декабря 2018

angular v 6.1.10
typcript v 2.9.2
rxjs v 6.3.3
ng2-stmompjs v 7.0.0

Я использую библиотеку ng2-stomp для веб-сокетов, которые создают наблюдаемый, инициирует подписку, которая является наблюдаемой.В моих требованиях я создаю несколько подписок на каналы на основе идентификатора приложения и теперь хочу подписать все эти каналы одновременно, или мы можем сказать, что наблюдаем более высокий порядок, поэтому попытался использовать различные операторы rxjs merge, mergeAll, concat но пока ничего не работает.Вот что я сделал до сих пор.

Сейчас этот работает

appList = [{appID: '123'}, {appID: '345'}];


 const appList$ = appList.map((appID: string, idx: number) => {
            const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
            const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
            console.log({ watcher }); // This is observable
            return watcher;
        });



appList$.forEach((app$) => {
            app$.subscribe((message: Message) => {
                const notification: Notification = JSON.parse(message.body);
                this.totalNotificationCount++;
                if (Object.keys(notification).length) {
                    this.notificationMessages.push(notification);
                }
            });
        });

{
  "watcher": { "_isScalar": false, "source": { "source": { "_isScalar": false } }, "operator": { "connectable": { "source": { "_isScalar": false } } } }
}

НО Я думаю, что мы можем объединить все наблюдаемые в одной и подписать все.Обратите внимание, что я не могу использовать ForkJoin, потому что appList является динамическим и поэтому номер WebSocket.Ниже приведены мои рекомендации по преобразованию нескольких наблюдаемых в один раз.

Пробная версия 1: с использованием concat и map оператора

const batch = appList.map((appID, idx) => {
            console.log({ appID, idx });
            const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
            const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
            return watcher;
        });



concat(...batch).pipe( map (i => i)).subscribe({ });

это дает ошибку:

Свойство 'pipe' не существует для типа 'MonoTypeOperatorFunction'.

пробная версия 2: использовать подписку всего после concat

 concat(...batch).subscribe({
            next: (v: any) => console.log(v),
            complete: () => console.log('Complete')
        });

Ошибка: подписка свойства ''не существует для типа' MonoTypeOperatorFunction '.

Трейл 3: использование pipe

const appList$ = appList.map((appID: string, idx: number) => {
            const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
            const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
            return watcher;
        });

        console.log({ appList$ });
        appList$.pipe(
            takeUntil(this.ngUnsubscribe),
            tap((i) => {
                console.log('tapping', i);
            })
        );

console.log ({appList $}) возвращает это observable groups

  {
  "appList$": [
    {
      "_isScalar": false,
      "source": {
        "source": {
          "_isScalar": false
        }
      },
      "operator": {
        "connectable": {
          "source": {
            "_isScalar": false
          }
        }
      }
    },
    {
      "_isScalar": false,
      "source": {
        "source": {
          "_isScalar": false
        }
      },
      "operator": {
        "connectable": {
          "source": {
            "_isScalar": false
          }
        }
      }
    },
    {
      "_isScalar": false,
      "source": {
        "source": {
          "_isScalar": false
        }
      },
      "operator": {
        "connectable": {
          "source": {
            "_isScalar": false
          }
        }
      }
    },
    {
      "_isScalar": false,
      "source": {
        "source": {
          "_isScalar": false
        }
      },
      "operator": {
        "connectable": {
          "source": {
            "_isScalar": false
          }
        }
      }
    },
    {
      "_isScalar": false,
      "source": {
        "source": {
          "_isScalar": false
        }
      },
      "operator": {
        "connectable": {
          "source": {
            "_isScalar": false
          }
        }
      }
    },
    {
      "_isScalar": false,
      "source": {
        "source": {
          "_isScalar": false
        }
      },
      "operator": {
        "connectable": {
          "source": {
            "_isScalar": false
          }
        }
      }
    }
  ]
}

Ошибка: свойство 'pipe' не существует для типа 'Observable []'

Поэтому мой вопрос заключается в том, как объединить всенаблюдаемый в один раз и подписаться в один раз

1 Ответ

0 голосов
/ 21 декабря 2018

Это удивительно;всякий раз, когда я напишу вопрос здесь и попробую еще раз, я сам найду решение.

Я решил этот способ, используя from и mergeMap, и благодаря этой статье с подробным описанием

private watchApplications(appList: string[]) {
 const appList$ = from(appList).pipe(
            mergeMap((appID, idx) => {
                const headers = Object.assign({}, this.headers, { id: `app_${idx}` });
                const watcher = this.rxStompService.watch(`/topic/${appID}`, headers);
                return watcher;
            })
        );
        appList$
            .pipe(
                takeUntil(this.ngUnsubscribe),
                tap((f: Frame) => {
                    console.log('tapping Frame', f);
                })
            )
            .subscribe((message: Message) => {
                const notification: Notification = JSON.parse(message.body);
                console.log({ notification });
                this.totalNotificationCount++;
                if (Object.keys(notification).length) {
                    this.notificationMessages.push(notification);
                }
            });

}
...