Учитывая массив из DateTime
объектов:
const changeTimes = [
// yyyy, mm, dd, hh, mm
DateTime.utc( 2018, 10, 31, 21, 45 ),
DateTime.utc( 2018, 10, 31, 21, 50 ),
DateTime.utc( 2018, 10, 31, 22, 00 ),
DateTime.utc( 2018, 10, 31, 23, 00 ),
DateTime.utc( 2018, 10, 31, 23, 30 ),
];
или, что еще лучше, Observable, который генерирует их каждый раз при изменении массива (что на самом деле происходит в моем сценарии, хотя я не упомянул об этом в вопросе, потому что он не был строго релевантным):
const changeTimes$: Observable<DateTime[]> = /* ... */;
следующая Observable немедленно выдаст следующее будущее время по подписке, испускает каждое последующее будущее время по прошествии предыдущего будущего времени, затем завершается с null
:
const nextTime$ = changeTimes$.pipe(
// sort DateTimes chronologically
map(unsorted => [...unsorted].sort((x, y) => +x - +y),
// remove duplicates
map(duplicated => duplicated.filter((item, i) => !i || +item !== +duplicated[i - 1])),
// convert each time to a delayed Observable
map(times => [...times, null].map((time, i) => defer(() => of(time).pipe(
// emit the first one immediately
// emit all others at the previously emitted time
delay(i === 0 ? 0 : +times[i - 1] - +DateTime.utc())
)))),
// combine into a single Observable
switchMap(observables => concat(...observables)),
);
- Сортировка необходима, поскольку каждый внутренний Observable («подождите X миллисекунд, затем сообщите время») подписывается по завершении предыдущего.
- Удаление дубликатов не является строго необходимым, но, кажется, уместно.
defer
используется для расчета текущего времени при подписке на внутреннюю Наблюдаемую.
concat
используется для последовательного выполнения каждого внутреннего наблюдаемого (благодаря martin ), что позволяет избежать накладных расходов на подписку для каждого времени в списке одновременно.
То, как это удовлетворяет мою первоначальную потребность:
У меня есть наблюдаемая RxJS, которую нужно пересчитывать в определенное время, как описано массивом объектов DateTime ...
- если я объединю это с данными, требующими пересчета, используя оператор combineLatest
для запуска этого пересчета в правильное время:
const timeAwareData$ = combineLatest(timeUnawareData$, nextTime$).pipe(
tap(() => console.log('either the data has changed or a time has been reached')),
// ...
);
Что мне не нравится в моем решении
Создает отдельную внутреннюю Наблюдаемую одновременно для каждого времени в списке. Я чувствую, что возможно сделать рефакторинг таким образом, что каждый внутренний Observable создается только после уничтожения предыдущего. Будут с благодарностью приняты любые советы по улучшению.