Как динамически изменить интервал потока, когда другой поток испускает - PullRequest
0 голосов
/ 05 сентября 2018

Итак, у меня есть поток из массива строк.

const msgs = ['Searching...', 'Gathering the Data...', 'Loading the Dashboard...'];
const msgs$ = Observable.from(msgs);

Мне нужно отправлять одно из этих сообщений последовательно каждые 3 секунды до тех пор, пока не издастся поток dataFetched$, что в основном означает, что все данные поступили на страницу. Я не могу перебрать их. Как только последнее сообщение увидено, но данные не поступили, оно не должно измениться.

Я могу просмотреть каждую строку в msgs с течением времени с помощью функции RxJS zip

const longInt$: Observable<number> = Observable.interval(3000);
const longMsgsOverTime$ = Observable.zip(msgs$, longInt$, msg => msg);

Затем, когда выходит поток dataFetched$, мне нужно переключиться на shortInt$, то есть

    const longInt$: Observable<number> = Observable.interval(1500);

И продолжайте отображать сообщения о загрузке, пока не появится последнее сообщение. Каждое сообщение должно быть просмотрено только один раз.

Для меня это довольно утомительно - я могу отметить некоторые требования, но не заставить их работать полностью. После многих попыток я пришел к выводу, что нам нужно обернуть массив msgs в тему, чтобы предотвратить повторение всех этих циклов после того, как мы переключились с longInt$ на shortInt$.

=================== EDIT ===================== После ответа и кода ggradnig я придумал это уродство (для целей отладки):

    setLoaderMsg(){
    console.log('%c setting loader msg', 'border: 10px dotted red;');
    const msgs$ = Observable.from(['Searching...', 'Gathering the Data...', 'Loading the Dashboard...', 'Something something...', 'yet another msg...', 'stop showing me...']),
        shortInt$ = Observable.interval(250).pipe(
            tap(v => console.log('%c short v', 'background: green;',v))
        ),
        longInt$ = Observable.interval(3000).pipe(
            tap(v => console.log('%c long v', 'background: red;',v)),
            takeUntil(this.dataFetched$)
        ),
        // interval$ = Observable.interval(3000).pipe(takeUntil(this.dataFetched$)).pipe(concat(Observable.interval(250))),
        interval$ = longInt$.pipe(concat(shortInt$));

    Observable.zip(msgs$, interval$, msg => msg)
    // Observable.zip(msgs$, interval$)
        .pipe(tap(v => {
            console.log('%c v', 'background: black; border: 1px solid red; color: white;', v);
            this.loaderMsg = v;
        }))
        .subscribe(
            () => {},                       //next
            () => {},                       //error
            // () => this.loading = false       //complete
            () => {
                console.log('%c complete', 'background: yellow; border: 2px solid red');
                // this.loading = false;
            }
        );
}

Ниже приведен скриншот моей хромированной консоли, чтобы увидеть, что происходит. Поток shortInt$ будет регистрировать зеленое сообщение, поскольку вы можете видеть, что этого никогда не происходит, даже если поток dataFetched$ испускает (оранжевая пунктирная граница). Под журналом «fetch data emit» мы должны увидеть зеленые фоновые сообщения, которые будут означать, что поток shortInt $ испускает значения.

enter image description here

================================= 2-е РЕДАКТИРОВАНИЕ ============ ============

Ниже dataFetched$ наблюдается:

fetchData(){
    console.log('%c fetching now', 'border: 2px dashed purple');

    // this.initLoader();

    this.dataFetched$ = Observable.combineLatest([
        this.heroTableService.getHeroTableData([SubAggs.allNetworks.key], AggFieldsMap.legends.name),
        this.researchService.benchmark(this.getSubAggsForOverviewTbl()),
        this.multiLineChartService.getEngagementOverTime(this.getSubAggsForNetworkEngagementsOverTimeTable())
    ]).pipe(
        share(),
        delay(7000),
        tap(v => {
            console.log('%c fetch data emit', 'border: 2px dashed orange', v);
        })
    );


    const sub = this.dataFetched$.subscribe(([heroTableRes, benchmarkRes, netByEngRes]: [ITable[], IBenchmarkResponse, any]) => {
         // this.loading = false; ==> moved to dashboard

         //xavtodo: split this shit into .do() or .tap under each http request call
         //hero table logic here
         this.heroTableData = heroTableRes;

         //////////////////////////////////////////////////////
         //engagement-by-network-table
         this.engagementByNetworkData = this.researchService.extractDataForEngagementByNetworkTable(benchmarkRes, this.getSubAggsForOverviewTbl());

         //////////////////////////////////////////////////////
         //network eng over time logic here MULTI LINE CHART
         this.engagementMultiLineData = this.multiLineChartService.extractEngagementData(netByEngRes, this.getSubAggsForNetworkEngagementsOverTimeTable());
         this.networks = this.multiLineChartService.getNetworks();
         this.multilineChartEngagements = Util.getNetworksWithAltNames(this.networks);
         this.publishedContentData = this.multiLineChartService.extractPublishedData(netByEngRes);

         //combined multiline chart
         this.multiLineChartData = {
             publishedData: this.publishedContentData,
             engagementData: this.engagementMultiLineData
         };
     });

    this.addSubscription(sub);
}

1 Ответ

0 голосов
/ 05 сентября 2018

Похоже на задачу для takeUntil и concat.

Первая наблюдаемая longInt$. Это завершается, как только dataFetched$ излучает. Для этого требования вы можете использовать takeUntil. Он принимает другую наблюдаемую в качестве параметра, и как только эта наблюдаемая излучает, исходная наблюдаемая завершится.

Из документов:

Издает значения, испускаемые источником Observable, пока notifier Observable не издаст значение.

Итак, первым шагом будет:

const longInt$: Observable<number> = Observable.interval(3000)
    .pipe(takeUntil(dataFetched$));

Далее вы хотите concat вашу другую наблюдаемую shortInt$. Concat позволит завершить первое наблюдаемое, а затем - второе наблюдаемое.

Из документов:

Создает выходной Observable, который последовательно генерирует все значения из данного Observable, а затем переходит к следующему.

Это будет выглядеть примерно так:

const longMsgsOverTime$ = Observable.zip(msgs$, 
    longInt$.pipe(concat(shortInt$)), msg => msg);

Примечание: Если вы используете RxJS 5 или ниже, замените оператор pipe (..) соответствующим оператором. Например, .pipe(concat(...)) превращается в .concat(...)

Вот пример на StackBlitz: https://stackblitz.com/edit/angular-mtyfxd

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...