Как разделить буферный наблюдаемый поток на 3 наблюдаемых и перемещать предметы между 3 наблюдаемыми? - PullRequest
0 голосов
/ 07 апреля 2020

Прежде всего, я не очень знаком с Observables, поэтому я должен попросить вас, ребята, внести ясность в мою голову. Я работаю над веб-приложением, в котором есть Angular клиент и сервер Rails. Для каждого запроса от клиента к серверу я хотел бы создать задачу и добавить ее в список задач. Это мой класс ServerTask:

export class ServerTask {

  private _state$ = new BehaviorSubject<ServerTaskState>({type:"pending"});
  readonly state$ = this._state$.asObservable();
  readonly createdAt: number;
  readonly id: string;

  constructor(readonly description: string) {
    this.id = generateUUIDv4();
    this.createdAt = Date.now();
  }

  get state() {
    return this._state$.value;
  }

  public succeeded() {
    this._state$.next({type: "success"});
    this._state$.complete();
  }

  public failed(message: string) {
    this._state$.next({type: "failure", message});
    this._state$.complete();
  }
}

Итак, в моем servertask.service.ts есть субъект, который принимает все созданные задачи:

private readonly _newTaskEvent$ = new Subject<ServerTask>();

Для буферизации фиксированного количества задач Я использую scan:

  private readonly _internalTasks$ : Observable<ServerTask[]>= this._newTaskEvent$.pipe(
    scan((acc, cur) => [...acc, cur].slice(-10), []),
    startWith([]),
    debounceTime(0),
    shareReplay(1)
  );

Оттуда я хотел бы получить список всех задач, которые имеют, например, состояние $ "в ожидании". Поэтому я создал функцию для фильтрации задач по состоянию:

private getTasks(state: ServerTaskStateType,cmap:(value:ServerTask)=>any): Observable<PublicServerTask[]> {
    return this._internalTasks$.pipe(
      mergeMap(async (tasks)=> {
        return tasks
        .filter(async (t) => {
          const promise = t.state$.pipe(first()).toPromise();
          const resolved = await promise;
          return resolved.type ===  state;})
        .map(cmap)}),
    )
  }

, и вот как я пытаюсь ее использовать:

  readonly pendingTasks$: Observable<PendingServerTask[]>  = this.getTasks("pending",(t)=>(
    {description: t.description,}
  ));
  readonly succeededTasks$: Observable<SucceededServerTask[]> = this.getTasks("success",(t)=>(
    {description: t.description,}
  ));

  readonly failedTasks$: Observable<FailedServerTask[]> = this.getTasks("failure",(t)=>(
    {description: t.description,message:t.state.message}
  ));

Когда одна задача T1 создается и отправляется в newTaskEvent $, pendingTasks$.pipe(first()).toPromise() содержит T1. Затем вызывается функция t1.succeeded (), а затем succeededTasks$.pipe(first()).toPromise() также содержит T1. Так что все правильно! Проблема заключается в том, что pendingTasks$.pipe(first()).toPromise() по-прежнему содержит t1, который теперь должен быть доступен только в последующих задачах $.

Я понятия не имею, как решить эту проблему или возможно ли с этой структурой в моем сервисе. Спасибо вам, ребята, за любую помощь!

РЕДАКТИРОВАТЬ:

Вот тест, как должен вести себя конвейер:

function firstPromise<T>(obs: Observable<T>) {
  return obs.pipe(first()).toPromise();
}

it("Single pending task later succeeds", async () => {
  const t = instantiate();
  // Create and add Task to _newTaskEvent$
  const task = mkTaskPending("t1");
  t.service.addTask(task);
  // Pending first
  const pending = await firstPromise(t.service.pendingTasks$);
  expect(pending.map((t) => t.description)).toEqual(["t1"]);
  const succeeded = await firstPromise(t.service.succeededTasks$);
  expect(succeeded.map((t) => t.description)).toEqual([]);
  // Changed to succeed
  task.succeeded();
  // Now succeeded !!--Error: Timeout - Async function did not complete--!!
  const succeededLater = await firstPromise(t.service.succeededTasks$);
  expect(succeededLater.map((t) => t.description)).toEqual(["t1"]);
  // Nothing remains pending
  const pendingLater = await firstPromise(t.service.pendingTasks$);
  expect(pendingLater.map((t) => t.description)).toEqual([]);
});

1 Ответ

0 голосов
/ 07 апреля 2020

эта проблема, кажется, возникает, потому что вы смотрите только на первое наблюдение за заданным состоянием задач, пока не добавляется новая задача во внутреннюю, и в этот момент вы смотрите снова.

Я думаю, вам нужно переписать Ваша getTasks функция выглядит следующим образом:

private getTasks(state: ServerTaskStateType,cmap:(value:ServerTask)=>any): Observable<PublicServerTask[]> {
    return this._internalTasks$.pipe(
      switchMap(tasks =>  // switchMap to cancel last subscription on new tasks
        // combine all task state observables, inner map into state and task
        combineLatest(tasks.map(t => t.state$.pipe(map(ts => [ts, t])))).pipe(
          // filter on task state and map into task / use map function
          map(taskStates => taskStates.filter(([ts, t]) => ts.type === state).map(([ts, t]) => cmap(t)))
        )
      )
    )
}

Здесь нужно понять одну вещь: это приведет к тому, что все потоки будут излучать при добавлении задачи или изменении ее состояния, даже если в этом конкретном случае нет изменений. поток задач.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...