Опрос запросов с использованием rjxs и angular - PullRequest
1 голос
/ 12 мая 2019

Я пытаюсь создать непрерывный опрос, используя rxjs и angular.Ниже приведена реализация моего требования.

Мой шаблон app.component имеет, например, 2 или более компонента (один и тот же компонент).

<widget ticker='BTC'></widget>
<widget ticker='ETH'></widget>

В widget.component я хотел бы получитьданные из API для заполнения виджета информацией о тикере, но цель состоит в том, чтобы собрать все тикеры и сделать всего один вызов, например (api / crypto / BTC, ETH) и вернуть данные всему виджету (в нашем случае 2).Каждый виджет будет считывать данные из ответа и продолжать извлекать каждую минуту.

Пример ответа:

{ BTC: { name: 'Bitcoin', price: 7000 }, ETH: { name: 'Etherium', price: 200 }}

Мой компонент виджета:

export class widgetComponent implements OnInit, OnDestroy {
  @Input() ticker: any;
  subscription: any;

  constructor(
    private cryptoService: CryptoService
  ) { }

  ngOnInit() {
    this.subscription = this.cryptoService
      .setupSymbol(this.ticker)
      .subscribe(data => {
        this.info = data[this.ticker];
      });
  }

  ngOnDestroy() {
    this.subscription.unsubscribe();
  }
}

Моя служба:

@Injectable({
  providedIn: 'root'
})
export class CryptoService {
  tickers: any = '';
  polledBitcoin$: Observable<number>;
  load$ = new BehaviorSubject('');

  constructor(
    private http: HttpClient
  ) { }

  bitcoin$ = this.http.get(`api/crypto/${this.tickers}`);

  whenToRefresh$ = of('').pipe(
    delay(1000),
    tap(_ => this.load$.next('')),
    skip(1),
  );

  poll$ = concat(this.bitcoin$, this.whenToRefresh$);

  setupTicker(ticker) {
    this.tickers += ticker + ',' ;

    return this.load$.pipe(
      concatMap(_ => this.poll$),
      share()
    );
  }

Мой код работает не так, как я ожидаю.Каждый виджет делает свой собственный вызов API для этого тикера.Но я хотел бы просто сделать одну коллекцию вызовов все тикером и поделиться запросом данных через все виджеты.

Невозможно создать поток с rxjs для одного массива, который содержит все тикеры, например ['BTC', 'ETH] и после этого начать опрос?Опрос должен ждать, пока все виджеты не сделают setupTicker.

Кто-нибудь вызывает помощь?Заранее спасибо.

1 Ответ

1 голос
/ 13 мая 2019

Имейте объект тикеров, который отслеживает, сколько виджетов отслеживает каждый символ, каждый раз, когда вы добавляете новый символ, запускайте новую подписку на опрос.

@Injectable({
  providedIn: 'root'
})
export class CryptoService {
  private tickers: { [ticker]: number } = {};
  private subscription: Subscription;

  tickers$ = new BehaviorSubject<{ [ticker]: { name: string, price: number } }>(undefined);

  constructor(
    private http: HttpClient
  ) { }

  subscribe(ticker: string) {
    if (this.tickers[ticker]) {
      this.tickers[ticker]++;
    } else {
      this.tickers[ticker] = 1;
      if (this.subscription) {
        this.subscription.unsubscribe();
      }
      this.subscription = interval(60000).pipe(
        switchMap(() => this.http.get<{ [ticker]: { name: string, price: number } }>(`api/crypto/${Object.keys(this.tickers).join(',')}`))
      ).subscribe(this.tickers$);
    }
  }

  unsubscribe(ticker: string) {
    if (this.tickers[ticker] > 1) {
      this.tickers[ticker]--;
    } else {
      delete this.tickers[ticker];
      if (Object.keys(this.tickers).length === 0) {
        this.subscription.unsubscribe();
      }
    }
  }
}

и в компоненте

export class widgetComponent implements OnInit, OnDestroy {
  @Input() ticker: string;

  ticker$ = this.cryptoService.tickers$.pipe(
    map(ticker => ticker && ticker[this.ticker])
  );

  constructor(
    private cryptoService: CryptoService
  ) { }

  ngOnInit() {
    this.cryptoService.subscribe(this.ticker);
  }

  ngOnDestroy() {
    this.cryptoService.unsubscribe(this.ticker);
  }
}

и использовать асинхронную трубу в шаблоне

<ng-content *ngIf="ticker$ | async as tickerVal">
  {{ tickerVal.name }} current price is {{ tickerVal.price }}
</ng-content>
...