Как я могу сделать RxJS Observable emit в определенные даты? - PullRequest
0 голосов
/ 31 октября 2018

У меня есть RxJS Observable , который необходимо пересчитывать в определенное время, как описано в массиве DateTime объектов (хотя для целей этого вопроса они могут быть JavaScript Date объекты, эпохи миллисекунд или что-либо еще, представляющее определенный момент времени):

const changeTimes = [
    //            yyyy, mm, dd, hh, mm
    DateTime.utc( 2018, 10, 31, 21, 45 ),
    DateTime.utc( 2018, 10, 31, 21, 50 ),
    DateTime.utc( 2018, 10, 31, 22, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 30 ),
];

Я изо всех сил пытаюсь понять, как создать Observable, который будет излучать в моменты времени, указанные в таком массиве.

Вот что я подумала, пытаясь ответить на свой вопрос:

  • Мне почти наверняка нужно использовать оператор delay , где указанная задержка - это время между «сейчас» и следующей будущей датой.
  • Мне как-то нужно убедиться, что «сейчас» актуально во время подписки, а не во время создания Наблюдаемого - возможно, с помощью оператора defer - хотя я не хочу излишне создавать несколько наблюдаемых экземпляров, если существует несколько подписок.
  • Я не уверен, как перебирать массив с течением времени - оператор expand может быть тем, что мне нужно, но он вызывает что-то рекурсивно , и я просто пытаюсь перебрать список.
  • Оператор timer кажется неактуальным, так как продолжительность между каждой датой-временем различна.
  • Я мог бы сопоставить каждую дату-время с ее собственной задержанной Наблюдаемой и вернуть их все через merge, но это становится ужасно неэффективным, так как число дат-таймов в массиве увеличивается (их может быть сотни), поэтому это абсолютное последнее средство.

Как я могу сделать наблюдаемую RxJS, которая принимает список дат и затем генерирует их по мере достижения во времени, заканчивая на последнем?

Ответы [ 4 ]

0 голосов
/ 31 октября 2018

Учитывая массив из DateTime объектов:

const changeTimes = [
    //            yyyy, mm, dd, hh, mm
    DateTime.utc( 2018, 10, 31, 21, 45 ),
    DateTime.utc( 2018, 10, 31, 21, 50 ),
    DateTime.utc( 2018, 10, 31, 22, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 30 ),
];

или, что еще лучше, Observable, который генерирует их каждый раз при изменении массива (что на самом деле происходит в моем сценарии, хотя я не упомянул об этом в вопросе, потому что он не был строго релевантным):

const changeTimes$: Observable<DateTime[]> = /* ... */;

следующая Observable немедленно выдаст следующее будущее время по подписке, испускает каждое последующее будущее время по прошествии предыдущего будущего времени, затем завершается с null:

const nextTime$ = changeTimes$.pipe(
    // sort DateTimes chronologically
    map(unsorted => [...unsorted].sort((x, y) => +x - +y),
    // remove duplicates
    map(duplicated => duplicated.filter((item, i) => !i || +item !== +duplicated[i - 1])),
    // convert each time to a delayed Observable
    map(times => [...times, null].map((time, i) => defer(() => of(time).pipe(
        // emit the first one immediately
        // emit all others at the previously emitted time
        delay(i === 0 ? 0 : +times[i - 1] - +DateTime.utc())
    )))),
    // combine into a single Observable
    switchMap(observables => concat(...observables)),
);
  • Сортировка необходима, поскольку каждый внутренний Observable («подождите X миллисекунд, затем сообщите время») подписывается по завершении предыдущего.
  • Удаление дубликатов не является строго необходимым, но, кажется, уместно.
  • defer используется для расчета текущего времени при подписке на внутреннюю Наблюдаемую.
  • concat используется для последовательного выполнения каждого внутреннего наблюдаемого (благодаря martin ), что позволяет избежать накладных расходов на подписку для каждого времени в списке одновременно.

То, как это удовлетворяет мою первоначальную потребность:

У меня есть наблюдаемая RxJS, которую нужно пересчитывать в определенное время, как описано массивом объектов DateTime ...

- если я объединю это с данными, требующими пересчета, используя оператор combineLatest для запуска этого пересчета в правильное время:

const timeAwareData$ = combineLatest(timeUnawareData$, nextTime$).pipe(
    tap(() => console.log('either the data has changed or a time has been reached')),
    // ...
);

Что мне не нравится в моем решении

Создает отдельную внутреннюю Наблюдаемую одновременно для каждого времени в списке. Я чувствую, что возможно сделать рефакторинг таким образом, что каждый внутренний Observable создается только после уничтожения предыдущего. Будут с благодарностью приняты любые советы по улучшению.

0 голосов
/ 31 октября 2018

Я думаю, что все, что вы кратко изложили в пунктах, обозначенных пулей, правильно. Использование delay кажется очевидным, но это затруднит понимание цепочки.

Решение, которое приходит мне в голову, предполагает, что вы знаете массив changeTimes до создания наблюдаемой цепочки. Вы можете создать свой собственный "метод создания наблюдаемых", который будет возвращать наблюдаемый, который излучает, например, на основе setTimeout (это просто "псевдокод", дата не рассчитывается должным образом):

const schedule = (dates: Date[]): Observable<Date> => new Observable(observer => {
  // Sort the `dates` array from the earliest to the latest...

  let index = 0;
  let clearTimeout;

  const loop = () => {
    const now = new Date();
    const delay = dates[index] - now;

    clearTimeout = setTimeout(() => {
      observer.next(dates[index++]);

      if (index < dates.length) {
        loop();
      }
    }, delay);
  }

  loop();

  return () => clearTimeout(clearTimeout);
}); 

...

schedule(changeTimes)
  .subscribe(...)

Последний вариант, который вы упоминаете с merge, на самом деле не так уж и плох. Я понимаю, что вы обеспокоены тем, что это создаст много подписок, но если вы отсортируете массив changeTimes, а затем используете concat вместо merge, он всегда будет сохранять только одну активную подписку, даже если вы создадите сотни Наблюдаемые.

0 голосов
/ 31 октября 2018

Вот рабочий пример:

import {Injectable} from '@angular/core';
import {Observable, Subject, timer} from 'rxjs';

@Injectable()
export class TimerService {

  futureDates: Date[] = [];
  futureDate: Date;
  notifier: Observable<string>;

  cycle = (observer) => {
    if (this.futureDates.length > 0) {
      this.futureDate = this.futureDates.shift();

      const msInFuture = this.futureDate.getTime() - Date.now();
      if (msInFuture < 0) {
        console.log(`date ${this.futureDate.toISOString()}
            expected to be in the future, but was ${msInFuture} msec in the past, so stopping`);

        observer.complete();
      } else {
        timer(msInFuture).subscribe(x => {
          observer.next(`triggered at ${new Date().toISOString()}`);
          this.cycle(observer);
        });
      }
    } else {
        observer. complete();
    }
  }

  getTimer(): Observable<string> {
    const now = new Date();
    const ms1 = now.getTime() + 10000;
    const ms2 = now.getTime() + 20000;
    this.futureDates.push(new Date(ms1));
    this.futureDates.push(new Date(ms2));

    this.notifier = new Observable(observer => {
      this.cycle(observer);
    });

    return this.notifier;
  }
}

В этом примере список будущих времен создается в методе getTimer(), но вы можете передать массив дат в этот метод.

Ключ заключается в том, чтобы просто хранить даты, обрабатывать по одной за раз и во время обработки, проверять, как далеко в будущем будет эта дата, и установить таймер приема Rx для одного числа миллисекунд.

0 голосов
/ 31 октября 2018
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...