RxJs - альтернатива concatMap, которая отбрасывает все между - PullRequest
0 голосов
/ 12 декабря 2018

Я пытаюсь найти оператора, который ведет себя аналогично concatMap, но отбрасывает все промежуточное.Например, concatMap выполняет следующие действия:

  • next a
  • начинает обработку a
  • next b
  • next c
  • завершить обработку a
  • начать обработку b
  • завершить обработку b
  • начать обработку c
  • завершить обработку c

Вместо этого я ищу механизм, который будет сбрасывать b, поскольку c уже пришло:

  • далее
  • startобработка a
  • следующая b
  • следующая c
  • закончить обработку a
  • (пропустить b)
  • начать обработку c
  • закончить обработку c

Более подробный пример приведен в этом разделе: https://gist.github.com/Burgov/afeada0d8aad58a9592aef6f3fc98543

Ответы [ 4 ]

0 голосов
/ 14 декабря 2018

Я думаю, что вы ищете оператор throttle.

Вот рабочий Stackblitz .Ключом к выполнению этой работы является установка объекта конфигурации, который передается в throttle(), что позволяет ему излучать (и обрабатывать) выбросы как ведущего, так и конечного источника, но игнорировать любые промежуточные выбросы в течение времени, в течение которого processData() работает.

Вот ключевая функция из Stackblitz:

// Use 'from' to emit the above array one object at a time
const source$ = from(sourceData).pipe(

  // Simulate a delay of 'delay * delayInterval' between emissions
  concatMap(data => of(data).pipe(delay(data.delay * delayInterval))),

  // Now tap in order to show the emissions on the console.
  tap(data => console.log('next ', data.emit)),

  // Finally, throttle as long as 'processData' is handling the emission
  throttle(data => processData(data), { leading: true, trailing: true }),

).subscribe()

Коротко и красиво, и работает как требуется, за исключением одного выпуска ...

Обновление:

Эта «одна проблема» с приведенным выше кодом состоит в том, что, когда исходная наблюдаемая завершается, throttle() отписывается от processData, фактически останавливая любую окончательную обработку, которая должна была быть выполнена.Исправление, как указал Барт ван ден Бург в комментариях ниже, заключается в использовании предмета.Я полагаю, что есть много способов сделать это, но Stackblitz был обновлен следующим кодом, который теперь работает:

// Set up a Subject to be the source of data so we can manually complete it
const source$ = new Subject();

// the data observable is set up just to emit as per the gist.
const dataSubscribe = from(sourceData).pipe(
    // Simulate a delay of 'delay * delayInterval' before the emission
    concatMap(data => of(data).pipe(delay(data.delay * delayInterval))),
).subscribe(data => {
    console.log('next ', data.emit); // log the emission to console
    source$.next(data); // Send this emission into the source
});

// Finally, subscribe to the source$ so we can process the data
const sourceSubscribe = source$.pipe(
    // throttle as long as 'processData' is handling the emission
    throttle(data => processData(data), { leading: true, trailing: true })
).subscribe(); // will need to manually unsubscribe later ...
0 голосов
/ 12 декабря 2018

Это самое простое решение, которое мне удалось сделать:

const source = new Subject();
const start = new Date();
const mockDelayedObs = val => of(val).pipe(delay(1200));

source.pipe(
  multicast(
    new ReplaySubject(1),
    subject => {
      let lastValue;

      return subject.pipe(
        filter(v => v !== lastValue),
        exhaustMap(v => {
          lastValue = v;
          return mockDelayedObs(v);
        }),
        take(1),
        repeat(),
      );
    }
  ),
)
.subscribe(v => {
  console.log(new Date().getTime() - start.getTime(), v);
});

setTimeout(() => source.next(0), 0);
setTimeout(() => source.next(1), 500);
setTimeout(() => source.next(2), 1000);
setTimeout(() => source.next(3), 1500);
setTimeout(() => source.next(4), 1800);
setTimeout(() => source.next(5), 4000);

Демонстрация в реальном времени: https://stackblitz.com/edit/rxjs-z33jgp?devtoolsheight=60

Порядок действий такой:

next 0
start handling 0
next 1
next 2
finish handling 0
start handling 2
next 3
next 4
finish handling 2
start handling 4
finish handling 4
start handling 5
finish handling 4

Таким образом, будут напечатаны только 0, 2, 4 и 5

Это будет работать и без оператора multicast, но я хотел избежать утечки переменных состояния.Кажется, что без них невозможно полностью, поэтому есть только один lastValue.Эта переменная используется только для игнорирования вызова mockDelayedObs для одного и того же значения дважды после повторной подписки на одну и ту же цепочку с помощью repeat().

0 голосов
/ 12 декабря 2018

Фу, это было трудно взломать:

https://stackblitz.com/edit/angular-yk7akk

Итак, в основном я создаю 2 наблюдаемые:

  • немедленные элементы - это элементы, которые могутвыполняется немедленно
  • postponedItems основан на lastFinished $.Он будет испускать последний элемент, выполнение которого было запрещено.

concatMap затем работает с объединением этих 2 наблюдаемых ..

Это работает, как описано, но это не совсем легкоили простой метод (императивный запах кода).Я слежу за этим обсуждением для более элегантных решений.

0 голосов
/ 12 декабря 2018

Может быть, вы могли бы попытаться использовать метод race на b и c сразу после выполнения mergeMap на a?

Я бы выглядел примерно так:

a.pipe(
  mergeMap(AResult => {
     // store aResult
     return race(b,c)
  }
).subscribe(
   finalResult => {
      // final result corresponding to either b or c
   }
)

Это будет работать, если у вас уже есть определенное количество вызовов для выполнения после a.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...