Объединение 2 наблюдаемых RxJs в отображаемую наблюдаемую - PullRequest
1 голос
/ 04 мая 2019

любая помощь приветствуется - я несколько новичок в RxJ, но я думаю, что у меня есть существенное недопонимание о том, как сопоставляются наблюдаемые.

Есть две наблюдаемые, которые я сохраняю через NGRX для одной учетной записи «ReceiveMessages»и "отправленные сообщения".каждый из них является просто массивом «MessageModel», который содержит toUsername и fromUsername.

Я пытаюсь объединить эти две наблюдаемые в одну наблюдаемую, чтобы я мог отобразить «MessageThreads» со всем соответствием между этими двумя пользователями, имея массив такого объекта с «username: в качестве уникального ключа почтикак HashMap, это выглядит так: {username: string, thread: MessageModel[]}

Я пытался использовать forkJoins, Concats, Merge, но я получаю странную ошибку. Это заставляет меня думать, что я, вероятно, не понял, что такое ngrx или rxjsна самом деле делает: (

inbox-container.component.ts


 import { Component, OnInit } from '@angular/core';
import { AppState } from 'src/app/store/app.reducers';
import { Store } from '@ngrx/store';
import { MessageModel } from '../../shared/models/message.model';
import { Observable, forkJoin, of, from, pipe } from 'rxjs';
import { map, concat, combineLatest, mergeMap, flatMap, switchMap, mapTo, merge } from 'rxjs/operators';
import { stringify } from '@angular/compiler/src/util';

@Component({
  selector: 'app-inbox-container',
  templateUrl: './inbox-container.component.html',
  styleUrls: ['./inbox-container.component.css']
})
export class InboxContainerComponent implements OnInit {

  receivedMessages: Observable<MessageModel[]>;
  sentMessages: Observable<MessageModel[]>;

  uniqueThreads: Observable<{ username: string, thread: MessageModel[] }[]>;

  constructor(private store: Store<AppState>) {
    this.receivedMessages = this.store.select('inbox', 'receivedMessages');
    this.sentMessages = this.store.select('inbox', 'sentMessages');
  }

  ngOnInit() {
    this.uniqueThreads =
      of(
        forkJoin(
          this.receivedMessages,
          this.sentMessages
        ),
        map((messages: MessageModel[]): Observable<{ username: string, thread: MessageModel[] }[]> => {
          let mergedThreads: { username: string, thread: MessageModel[] }[] = [];
          messages.forEach((message) => {
            let activeThreadFrom = mergedThreads.find(any => any.username === message.fromUsername)
            if (activeThreadFrom === null) {
              mergedThreads.push({ username: message.fromUsername, thread: [message] })
            } else {
              activeThreadFrom.thread.push(message);
            }
            mergedThreads.push(activeThreadFrom);

            let activeThreadTo = mergedThreads.find(any => any.username === message.toUsername)
            if (activeThreadTo === null) {
              mergedThreads.push({ username: message.toUsername, thread: [message] })
            } else {
              activeThreadTo.thread.push(message);
            }
            mergedThreads.push(activeThreadTo)

          })
          return of(
            mergedThreads
          )
        }
        )
      )
  }

}


inbox-container.component.html


<div class="col-xs-12">
  <div class="col-xs-4">
    <div class="list-group">
      <a class="list-group-item" routerLinkActive="active" routerLink="compose">
        <h4 class="list-group-item-heading">
          <div style="margin-top: 10px; margin-right: 10px; padding-right:10px;" class="dropdown" appSearchBarDirective
            [text]=searchText.value>
            <input class="form-control" type="text" placeholder="Username.." aria-label="Search" #searchText>
            <div class="dropdown-menu">
              <app-reactive-accounts-search [searchForm]=searchText.value></app-reactive-accounts-search>
            </div>
          </div>
        </h4>
      </a>
      <div class="list-group">
        <div class="list-group-item" *ngFor="let thread of ( uniqueThreads | async)" appHighlightDirective
          style="cursor: pointer;">
          <div class="list-group-item-heading">
            <h4>{{ thread.username }}
              <div class="badge pull-right">4</div>
            </h4>
          </div>
        </div>
      </div>
    </div>
  </div>
  <div class="col-xs-8">
    <div class="panel panel-default">
      <router-outlet></router-outlet>
    </div>
  </div>
  <button (click) = "wtf()">asdasd</button>
</div>

Я получаю сообщение об ошибке вжурналы консоли:

InboxContainerComponent.html:10 ERROR Error: Cannot find a differ supporting object 'function mapOperation(source) {
        if (typeof project !== 'function') {
            throw new TypeError('argument is not a function. Are you looking for `mapTo()`?');
        }
        return source.lift(new MapOperator(project, thisArg));
    }' of type 'mapOperation'. NgFor only supports binding to Iterables such as Arrays.
    at NgForOf.push../node_modules/@angular/common/fesm5/common.js.NgForOf.ngDoCheck (common.js:3184)
    at checkAndUpdateDirectiveInline (core.js:22101)
    at checkAndUpdateNodeInline (core.js:23362)
    at checkAndUpdateNode (core.js:23324)
    at debugCheckAndUpdateNode (core.js:23958)
    at debugCheckDirectivesFn (core.js:23918)
    at Object.eval [as updateDirectives] (InboxContainerComponent.html:16)
    at Object.debugUpdateDirectives [as updateDirectives] (core.js:23910)
    at checkAndUpdateView (core.js:23306)
    at callViewActi

А также предупреждение в IDE:


Type 'Observable<Observable<[MessageModel[], MessageModel[]]> | OperatorFunction<MessageModel[], Observable<{ username: string; thread: MessageModel[]; }[]>>>' is not assignable to type 'Observable<{ username: string; thread: MessageModel[]; }[]>'.
  Type 'Observable<[MessageModel[], MessageModel[]]> | OperatorFunction<MessageModel[], Observable<{ username: string; thread: MessageModel[]; }[]>>' is not assignable to type '{ username: string; thread: MessageModel[]; }[]'.
    Type 'Observable<[MessageModel[], MessageModel[]]>' is missing the following properties from type '{ username: string; thread: MessageModel[]; }[]': l

Любая помощь будет принята с благодарностью!

- РЕШЕНИЕ ---

Я просто привязан к состоянию хранилища NGRX, и, как сказал ответ, нет необходимости создавать дополнительные наблюдаемые.


 init() {
    this.uniqueThreads = this.store.select('inbox').pipe(
      map((state: fromInbox.State) => {
        return [
          state.receivedMessages,
          state.sentMessages
        ]
      }),
      ).pipe(
        map(([receivedMessages, sentMessages]) => {
          let mergedThreads: {username: string, thread: { message: MessageModel, received: boolean }[]}[] = [];
          const messages: MessageModel[] = [...receivedMessages, ...sentMessages];
          messages.forEach((message) => {
            console.log(message)
            let activeThreadFrom = mergedThreads.find(any => any.username === message.fromUsername)
            if (activeThreadFrom === undefined) {
              mergedThreads.push({ username: message.fromUsername, thread: [{message: message, received: true}] })
            } else {
              activeThreadFrom.thread = [ ...activeThreadFrom.thread, {message: message, received: true}];
            };

            let activeThreadTo = mergedThreads.find(any => any.username === message.toUsername)
            if (activeThreadTo === undefined) {
              mergedThreads.push({ username: message.toUsername, thread: [{message: message, received: false}] })
            } else {
              activeThreadTo.thread = [ ...activeThreadTo.thread, {message: message, received: false}];
            }
          })
          console.log(mergedThreads)
          return mergedThreads;
        })
      );
  }


1 Ответ

0 голосов
/ 04 мая 2019

Привет, я не вижу смысла использовать of здесь, и вы используете forkJoin и map неправильно внутри него, когда вы объединяете две или более наблюдаемых через forkJoin, он уже возвращает наблюдаемую, поэтому вы можно использовать .pipe на нем и сделать карту внутри него, см. мой пример

ngOnInit() {
  this.uniqueThreads =
    forkJoin(
      this.receivedMessages,
      this.sentMessages
    ).pipe(
      map(([receivedmessages, sentMessages]): Observable<{ username: string, thread: MessageModel[] }[]> => {
        let mergedThreads: { username: string, thread: MessageModel[] }[] = [];
        const messages: MessageModel[] = [...receivedmessages, ...sentMessages];
        messages.forEach((message) => {
          let activeThreadFrom = mergedThreads.find(any => any.username === message.fromUsername)
          if (activeThreadFrom === null) {
            mergedThreads.push({ username: message.fromUsername, thread: [message] })
          } else {
            activeThreadFrom.thread.push(message);
          }
          mergedThreads.push(activeThreadFrom);

          let activeThreadTo = mergedThreads.find(any => any.username === message.toUsername)
          if (activeThreadTo === null) {
            mergedThreads.push({ username: message.toUsername, thread: [message] })
          } else {
            activeThreadTo.thread.push(message);
          }
          mergedThreads.push(activeThreadTo)
        })
        return mergedThreads;
      })
    );
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...