Опрос в rxjs и ngrx - PullRequest
       16

Опрос в rxjs и ngrx

0 голосов
/ 27 апреля 2018

Эй, я новичок в rxjs и ngrx, и я создаю приложение, используя эти технологии. Я пытаюсь подумать о том, как создать систему Поллинга, используя наблюдаемые и операторы rxjs.

Я создал базовую систему опросов, которая содержит карту подписок наблюдаемых. каждая наблюдаемая отправка действия каждые 5 секунд для ngrx-эффектов, которые обрабатывают действие и выполняют побочные эффекты, такие как http-вызов, с использованием службы.

Моя проблема в том, что я хочу создать специальный механизм для текущей системы пула, который имеет следующие условия:

1.Первый пул происходит сразу, для этого я использую таймер (0, poolingTime), или интервал с трубой stratwith (ноль).

2.Пул знает, что должен задержать свой следующий запрос в соответствии со временем предыдущего запроса. Я имею в виду, что, когда предыдущий запрос завершен, возникает второй запрос.

Первое условие, которое я получил в одиночку, второе условие (2) Мне нужна помощь в достижении этого. Я думаю о debounce или thorder inorder, чтобы выполнить второе условие, но, как я уже сказал, во-первых, у меня нет большого опыта с rxjs.

Вот код моей простой системы объединения

import { Injectable } from '@angular/core';
import { Observable } from 'rxjs/Observable';
import { Subscription } from 'rxjs/Subscription';
import { timer } from 'rxjs/observable/timer';
import { interval } from 'rxjs/observable/interval';
import { throttleTime, debounceTime, startWith, tap, delay } from 'rxjs/operators';
import { Utils } from '../utils';
@Injectable()
export class PoolingService {

  private subscriptions: { [id: string]: Subscription };

  constructor() {
    this.subscriptions = {};
  }

  public startPooling(time: number, callback: Function): string {
    const id = Utils.guid();
    const interval$ = interval(time).pipe(tap(tick => console.log("tick", tick))).pipe(startWith(null));
    // const interval$ = timer(0, time).pipe(tap(tick => console.log("tick", tick)));

    const subscription = interval$.subscribe(() => { callback() });
    this.subscriptions[id] = subscription;
    return id;
  }

  public stopPooling(id: string) {
    const subscription = this.subscriptions[id];
    if (!subscription) {
      return;
    }
    subscription.unsubscribe();
  }

}

Вот использование Сервиса опроса:

ngOnInit() {

    this.store.select('domains').subscribe((state: any) => {
      const { list, lastAddedDomain } = state;
      this.markers = list;
      this.roots = Utils.list_to_tree(list);
    });

    this.poolService.startPooling(5000, () => {
      this.store.dispatch(new AllHttpActions.HttpActionGet({}, HttpMethods.GET, "/getDomainsForMap", AllDomainActions.FETCH_DOMAINS, Utils.guid()));
    });

  }

1 Ответ

0 голосов
/ 27 апреля 2018

Я бы, наверное, попробовал что-то подобное. Я добавил комментарии по всему коду, которые должны помочь вам понять, почему я сделал определенные вещи.

import { Injectable, OnDestroy } from '@angular/core';
import { Subject } from 'rxjs/Subject';
import { Observable } from 'rxjs/Observable';
import { timer } from 'rxjs/observable/timer';
import { interval } from 'rxjs/observable/interval';
import { startWith, tap, mergeMap, take, takeUntil, filter, map, catchError, delay } from 'rxjs/operators';
import { HttpClient } from '@angular/common/http';
import { of } from 'rxjs/observable/of';
import { Subscription } from 'rxjs/Subscription';

@Injectable()
export class PollingService implements OnDestroy {

    private destroyed$ = new Subject<any>();

    poll<PollResultType>(intervalTime: number, pollFunction: () => Observable<PollResultType>): Observable<any> {
        let isRequesting = false;
        return timer(0, intervalTime)
            .pipe(
                // When the service is destroyed, all polls will be unsubscribed from
                takeUntil(this.destroyed$)),
                tap(tick => console.log('tick', tick))),
                // Only continue if isRequesting is false
                filter(() => !isRequesting)),
                // Set isRequesting to true before requesting data
                tap(() => isRequesting = true)),
                // Execute your poll function
                mergeMap(pollFunction)),
                // Set isRequesting to false, so the next poll can come through
                tap(() => isRequesting = false)
            );
    }

    ngOnDestroy() {
        // When the service gets destroyed, all existing polls will be destroyed as well
        this.destroyed$.next();
        this.destroyed$.complete();
    }
}

// In this example this is a service. But this could also be a component, directive etc.
@Injectable()
export class ConsumerService {

    private subscription: Subscription;

    private requester: Observable<any>;

    constructor(private polling: PollingService, private http: HttpClient) {
        // Instead of calling poll and subscribing directly we do not subscribe.
        // Like that we can have a requester where we can subscribe to activate
        // the polling. You might not need that.
        this.requester = this.polling.poll(
            500,
            // This is our polling function which should return another observable
            () => this.http.get('https://cors-test.appspot.com/test')
                .pipe(
                    // Uncomment following line to add artificial delay for the request
                    // delay(2000),
                    // Don't forget to handle errors
                    catchError(error => {
                        return of('Failed');
                    })
                )
        );

        // Let's activate our poll right away
        this.activate();
    }

    activate() {
        // Deactivate on activation to deactivate any subscriptions that are already running
        this.deactivate();

        // Subscribe to activate polling and do something with the result
        this.subscription = this.requester
            // This is for testing purposes. We don't want to overload the server ;)
            .pipe(take(10))
            .subscribe(res => console.log(res));
    }

    deactivate() {
        if (this.subscription) {
            this.subscription.unsubscribe();
            this.subscription = undefined;
        }
    }
}

Может быть, стоит отметить несколько общих вещей:

  • Для запуска этого кода вам необходимо выполнить следующие действия:
  • Скопируйте код в файл ts в вашем источнике.
  • Добавьте PollingService и ConsumerService к поставщику модулей вашего приложения.
  • Добавьте ConsumerService в качестве зависимости где-нибудь, чтобы он выполнялся.
  • Я установил время опроса на 500 мс для целей тестирования.
  • В конструкторе ConsumerService есть закомментированная строка с оператором задержки. Если вы раскомментируете эту строку, вы можете смоделировать, что происходит, если выполнение запроса занимает больше времени. Вы должны увидеть эффект в консоли, если задержка больше, чем intervalTime
  • В методе ConsumerService.activate я ограничил число опросов до 10, чтобы не раздражать сервер за тестовым URL, который я вызываю.
  • Это может помочь лучше понять, что происходит, добавив tap(() => ...) с инструкциями журнала консоли между различными шагами.

Надеюсь, это поможет.

...