Есть ли способ использовать наблюдаемую функцию возврата для каждого элемента другого наблюдаемого массива? - PullRequest
1 голос
/ 11 июня 2019

Я получаю Observable<Group[]> из моей коллекции Firebase.В этом классе группы есть идентификатор, который я хочу использовать для получения другого массива наборов данных из Firebase, который будет сообщениями для каждой уникальной группы Observable<Message[]>. (Каждая группа имеет свой собственный чат: Message[]) И он хочет вернуть наблюдаемыйкоторые содержат массив нового типа: return { ...group, messages: Message[] } as GroupWithMessages

конечная цель должна быть Observable<GroupWithMessages[]>

getGroupWithChat(): Observable<GroupWithMessages[]> {
    const groupColl = this.getGroups(); // Observable<Group[]>

    const messages = groupColl.pipe(
      map(groups => {
        return groups.map(meet => {
          const messages = this.getMessagesFor(group.uid);
          return { messages:messages, ...group} as GroupWithMessages
        });
      })
    );

    return messages;

  }
}

и здесь функция сообщения

 getMessagesFor(id: string): Observable<Message[]> {
    return this.afs.collection<Message>(`meets/${id} /messages`).valueChanges();
}

к сожалениюэто не работает, потому что когда я создаю новый Obj, я не могу связать messages:messages, потому что сообщения ist vom typ Observable<Message[]>

Я надеюсь, что это очищает вещи

ОБНОВЛЕНИЕ: мойТеперь основная проблема сводится к следующему:

getGroupsWithMessages() {
    this.getJoinedGroups()
      .pipe(
        mergeMap(groups =>
          from(groups).pipe(
            mergeMap(group => {
              return this.getMessagesFor(group.uid).pipe(
                map(messages => {
                  return { ...group, messages } as GroupIdMess;
                })
              );
            }),
            tap(x => console.log('reaching here: ', x)),
            toArray(),
            tap(x => console.log('not reaching here = completed: ', x))
          )
        ),
        tap(x => console.log('not reaching here: ', x))
      )
      .subscribe(x => console.log('not reaching here: ', x));
  }

, когда я вызываю эту функцию, мой console.log выглядит следующим образом:

enter image description here

Ответы [ 3 ]

0 голосов
/ 12 июня 2019

Используйте forkJoin, чтобы дождаться получения сообщений для всех групп.Затем сопоставьте результат forkJoin с массивом GroupWithMessages следующим образом -

getGroupWithChat(): Observable<GroupWithMessages[]> {

    return this.getGroups()
               .pipe(
                 switchMap(groups => {
                   const messagesForAllGroups$ = groups.map(group => this.getMessagesFor(group.uid));
                   return forkJoin(messagesForAllGroups$)
                          .pipe(
                            map(joined => {

                              //joined has response like -
                              //[messagesArrayForGroup0, messagesArrayForGroup1, messagesArrayForGroup2....];

                              const messagesByGroup = Array<GroupWithMessages>();

                              groups.forEach((group, index) => {
                                //assuming that GroupWithMessages has group and messages properties.
                                const gm = new GroupWithMessages();
                                gm.group = group;
                                gm.messages = joined[index];
                                messagesByGroup.push(gm);
                              });

                              return messagesByGroup;
                            })
                          )
                 })
               )    

  }
0 голосов
/ 12 июня 2019

Обычно я делаю это, разбивая Observable<any[]> на Observable<any>, а затем mergeMap результаты на внутреннюю Наблюдаемую.

Примерно так должно работать:

  getMessagesFor(id: string): Observable<number> {
    return of(1);
  }

  getGroups(): Observable<string[]> {
    return of(["1", "2"]);
  }

  getGroupWithChat() {
    this.getGroups().pipe(
      mergeMap(groups => from(groups)), // Split the stream into individual group elements instead of an array
      mergeMap(group => {
        return this.getMessagesFor(group).pipe(
          map(messages => {
            return Object.assign(group, messages);
          })
        );
      })
    );
  }

Edit:

Рассмотрим поведение субъекта. Это совсем не завершено:

const behSub: BehaviorSubject<number[]> = new BehaviorSubject([1, 2, 3]);

setTimeout(() => {
  behSub.next([4, 5, 6]);
}, 5000);

behSub
  .pipe(
    mergeMap(arr =>
      from(arr).pipe(
        tap(), // Do something with individual items, like mergeMap to messages
        toArray() // Go back to array
      )
    )
  )
  .subscribe(console.log, null, () => {
    console.log('Complete');
  });
0 голосов
/ 11 июня 2019

Не уверен, буду ли я следовать тому, что вы делаете здесь, но логика выглядит так, как вы хотели бы:

getGroupWithChat() {
    return this.getGroups.pipe(map(groups=> {
        return groups.map(group => this.getMessagesFor(group.uid));
    })).subscribe(); // trigger "hot" observable
}

Дайте мне знать, если я смогу помочь вам после того, как вы уточнить.

ОБНОВЛЕНИЕ:

Похоже, вам нужно получить UID группы, прежде чем делать вызов, чтобы получить GroupMessages []?

  1. get Group: Observable
  2. call getMessagesFor (Group.uid)

этот пример получает результат групп $, тогда concatMap использует результат групп $ для выполнения запроса сообщений

 this.getGroups().pipe(
     concatMap((group: Group) => this.getMessagesFor(group.uid))
 ).subscribe((messages: GroupWithMessages[]) => {
    console.log(messages); 
 });

Возможно, вы все еще хотите отобразить их вместе, но кажется, что вы знаете, как это сделать.concatMap ждет завершения первого звонка, затем делает второй звонок, который вам нужен.

Это ближе?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...