RXJS разделить по идентификатору и обработать последовательно для каждого идентификатора - PullRequest
0 голосов
/ 04 сентября 2018

Проблема: Игра: Итак, у меня есть несколько кораблей, которые могут прибыть на многие планеты. Если 2 корабля прибывают одновременно на новую планету, это может привести к одному и тому же процессу смены владельца дважды. Этот процесс является асинхронным и должен происходить только один раз за смену владельца планеты.

Чтобы это исправить, я хочу разделить поток кораблей по идентификатору планеты, чтобы каждый поток был только для одной планеты. Сложность в том, что каждый корабль должен обрабатываться только после того, как был обработан предыдущий.

  • доставка $
  • Разделить по идентификатору планеты
    • планета id1: процесс в последовательности
    • планета id2: процесс в последовательности
    • ...

Вот код, который покажет, как он должен себя вести.

const ships = [
  {
    id: 1,
    planetId: 1,
  },
  {
    id: 2,
    planetId: 1,
  },
  {
    id: 3,
    planetId: 2,
  },
  // ... never finishes 
]
// the source observable never finishes 
const source$ = interval(1000).pipe(
  take(ships.length),
  map(i => ships[i]),
)

const createSubject = (ship) => {
  // Doesn't need to be a subject, but needs to emit new items after a bit of time based on some other requests.
  console.log(`>>>`, ship.id);
  const subject = new Subject();
  setTimeout(() => {
    subject.next(ship.id + ' a' + new Date());
  }, 1000);
  setTimeout(() => {
    subject.next(ship.id + ' b' + new Date());
    subject.complete();
  }, 2000);
  return subject.asObservable();
}

// The result should be the following (t, is the time in seconds, t3, is time after 3 seconds)
// t0: >>> 1
// t0: >>> 3
// t1: 1 a
// t1: 2 a
// t2: 1 b
// t2: 2 b
// t2: >>> 2 (note that the second ship didn't call the createSubject until the first finished)
// t3: 1 a
// t4: 1 2

Решение (с большой помощью А.Виннена и некоторыми выяснениями)

Запустите его здесь: https://stackblitz.com/edit/angular-8zopfk?file=src/app/app.component.ts

  const ships = [
    {
      id: 1,
      planetId: 1,
    },
    {
      id: 2,
      planetId: 1,
    },
    {
      id: 3,
      planetId: 2,
    }
  ];
  const createSubject = (ship) => {
    console.log(ship.id + ' a')
    const subject = new Subject();
    setTimeout(() => {
      //subject.next(ship.id + ' b');
    }, 500);//
    setTimeout(() => {
      subject.next(ship.id + ' c');
      subject.complete();//
    }, 1000);
    return subject.asObservable();
  }
  let x = 0;
  interval(10).pipe(//
    take(ships.length),
    map(i => ships[i]),
    groupBy(s => s.planetId),
    mergeMap(group$ => {//
      x++
      return group$.pipe(
        tap(i => console.log('x', i, x)),
        concatMap(createSubject)
      )
    }),
  ).subscribe(res => console.log('finish', res), undefined, () => console.log("completed"))

Как это можно сделать в rxjs?

Код:

  const shipArriveAction$ = action$.pipe<AppAction>(
    ofType(ShipActions.arrive),
    groupBy(action => action.payload.ship.toPlanetId),
    mergeMap((shipByPlanet$: Observable<ShipActions.Arrive>) => {
      return shipByPlanet$.pipe(
        groupBy(action => action.payload.ship.id),
        mergeMap((planet$) => {
          return planet$.pipe(
            concatMap((action) => {
              console.log(`>>>concat`, new Date(), action);
              // this code should be called in sequence for each ship with the same planet. I don't need only the results to be in order, but also this to be called in sequence.
              const subject = new Subject();
              const pushAction: PushAction = (pushedAction) => {
                subject.next(pushedAction);
              };
              onShipArriveAction(state$.value, action, pushAction).then(() => {
                subject.complete();
              });
              return subject.asObservable();
            }),
          )
        })
      );

    )
  ;

Код от A.Winnen очень близок, но работает только с видимым источником, который завершен, а не непрерывен:

    const ships = [
      {
        id: 1,
        planetId: 1,
      },
      {
        id: 2,
        planetId: 1,
      },
      {
        id: 3,
        planetId: 2,
      }
    ];
    const createSubject = (ship) => {
      console.log(ship.id + ' a')
      const subject = new Subject();
      setTimeout(() => {
        subject.next(ship.id + ' b');
      }, 1000);//
      setTimeout(() => {
        subject.next(ship.id + ' c');
        subject.complete();//
      }, 2000);
      return subject.asObservable().pipe(
        finalize(null)
      );
    }

    interval(1000).pipe(
      take(ships.length),
      tap(console.log),
      map(i => ships[i]),
      groupBy(s => s.planetId),
      mergeMap(group => group.pipe(toArray())),
      mergeMap(group => from(group).pipe(
        concatMap(createSubject)
      ))
    ).subscribe(res => console.log(res), undefined, () => console.log("completed"))

1 Ответ

0 голосов
/ 04 сентября 2018

вы можете использовать комбинацию groupBy и mergeMap для достижения вашей цели.

from(ships).pipe(
  groupBy(ship => ship.planetId),
  mergeMap(planetGroup => planetGroup.pipe(
    concatMap(ship => {
      // do real processing in this step
     return of(`planetGroup: ${planetGroup.key} - processed ${ship.ship}`);
    })
  ))
).subscribe(result => console.log(result));

Я сделал простой пример: https://stackblitz.com/edit/angular-6etaja?file=src%2Fapp%2Fapp.component.ts

EDIT: обновлен блицстек: https://stackblitz.com/edit/angular-y7znvk

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...