Почему оператор takeWhile не фильтрует при включении? - PullRequest
2 голосов
/ 23 сентября 2019

Я пытаюсь использовать оператор takewhile с включенной опцией true, и я сталкиваюсь с поведением, которое не понимаю.Мне удалось выделить небольшой фрагмент кода, в котором я могу воспроизвести поведение здесь

import { from, BehaviorSubject } from 'rxjs'; 
import { map, takeWhile } from 'rxjs/operators';

const value$ = new BehaviorSubject<number>(1);

const source = value$.pipe(
  map(x => `value\$ = ${x}`),
  takeWhile(x => !x.includes('4'), /*inclusive flag: */true)
);

source.subscribe(x => {
  console.log(x); 
  value$.next(4); // Strange behavior only in this case
  });

объяснение: без включающего флага регистрируется «значение $ = 1», и поток завершается

НО, с включенным флагом true, он падает с исключением переполнения стека enter image description here

Мой вопрос: почему вместо этого он проходит через takeWhile более одного раза?остановки после первого вхождения?

вот ссылка на скамейку, когда-либо это помогает понять: https://stackblitz.com/edit/rxjs-ag4aqx

1 Ответ

0 голосов
/ 24 сентября 2019

Немного покопавшись в исходном коде оператора (https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/takeWhile.ts),, действительно, что-то не так, я собираюсь сообщить о проблеме в github.

Тем временем, здесь есть фиксированный кастомОператор takeWhileInclusive

import { from, BehaviorSubject, Observable } from 'rxjs';
import { map, takeWhile } from 'rxjs/operators';

/** Custom takewhile inclusive Custom takewhile inclusive properly implemented */
const customTakeWhileInclusive = <T>(predicate: (value: T) => boolean) => (source: Observable<T>) => new Observable<T>(observer => {
  let lastMatch: T | undefined // fix
  return source.subscribe({
    next: e => {
      if (lastMatch) {
        observer.complete();
      }
      else {
        if (predicate(e)) {
          /*
           *   Code from https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/takeWhile.ts
           *  
           *   if (this.inclusive) {
           *      destination.next(value); // NO! with a synchronous scheduler, it will trigger another iteration without reaching the next "complete" statement 
           *      and there is no way to detect if a match already occured!
           *   }
           *   destination.complete();
           */

          // Fix:
          lastMatch = e; // prevents from having stackoverflow issue here
        }

        observer.next(e);
      }
    },
    error: (e) => observer.error(e),
    complete: () => observer.complete()
  });
});

const value$ = new BehaviorSubject<number>(1);

const source = value$.pipe(
  map(x => `value\$ = ${x}`),
  //takeWhile(x => !x.includes('4'), true)
  customTakeWhileInclusive(x => x.includes('4'))  // fix
);

source.subscribe(x => {
  console.log(x);
  value$.next(4);
});

Проблема с действительным оператором заключается в том, что в контексте синхронного планировщика, когда происходит совпадение, он запускает другую итерацию и никогда не достигает "завершено". Правильная реализация должна была бы пометить совпадение ивыполните еще одну последнюю итерацию, в которой вы завершите обнаружение флага.

Ссылка на обновленный стек стека: https://stackblitz.com/edit/rxjs-ag4aqx

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...