Как заставить rx js сделать паузу / возобновить? - PullRequest
1 голос
/ 20 февраля 2020

Теперь есть массив, значением массива является ссылка на изображение, например:

const imageList = [
  'https://cdn.pixabay.com/photo/2020/02/16/20/29/new-york-4854718_960_720.jpg',
  'https://cdn.pixabay.com/photo/2020/02/14/16/04/mallorca-4848741_960_720.jpg',
  'https://cdn.pixabay.com/photo/2020/02/14/04/20/old-city-4847469_960_720.jpg',
  // more...
];

Я хочу использовать rxjs для их последовательной загрузки (я приложение Electron, поэтому я можете загрузить его)

Когда загрузка первого изображения будет завершена, загрузите второе изображение. Когда третье изображение загружается, пользователь нажимает кнопку паузы и ожидает завершения загрузки третьего изображения. Тогда больше никаких загрузок. Когда пользователь нажимает кнопку продолжения, загрузка начинается с четвертого изображения.

Я ссылаюсь на эту статью: Buffering (lossless) раздел в https://medium.com/@kddsky / pauseable-observables-in-rx js -58ce2b8c7dfd . Код в этой статье:

merge(
  source$.pipe( bufferToggle(off$, ()=>on$)  ),
  source$.pipe( windowToggle(on$, ()=>off$) )
).pipe(
  // then flatten buffer arrays and window Observables
  flatMap(x => x)
)

Демонстрационный URL: https://thinkrx.io/gist/cef1572743cbf3f46105ec2ba56228cd

Но в этом коде есть две проблемы, которые не соответствуют моим необходимо. Я не знаю, как его изменить.

  1. Я использую redux-observable, поэтому я хочу вызвать их двумя действиями: (this.props.start() / this.props.pause())
  2. После восстановления данные по-прежнему передаются по одному, а не одноразовый выпуск

Текущий код выглядит следующим образом:

export const epicDownloadResources = (
  action$: ActionsObservable<AnyAction>,
  store$: StateObservable<RootState>,
) => {
  return action$.pipe(
    ofType(appActions.other.start()),
    of([
      'https://cdn.pixabay.com/photo/2020/02/16/20/29/new-york-4854718_960_720.jpg',
      'https://cdn.pixabay.com/photo/2020/02/14/16/04/mallorca-4848741_960_720.jpg',
      'https://cdn.pixabay.com/photo/2020/02/14/04/20/old-city-4847469_960_720.jpg',
    ]),
    mergeMap(() => {
      // code
    }),
    mergeMap((url: string) => {
      // downloading
    })
}

В реальном продукте, он будет загружать внутренние ресурсы изображений компании, а не другие изображения, не защищенные авторским правом.

Ответы [ 2 ]

2 голосов
/ 21 февраля 2020

Я выбрал совершенно другой подход.

Если я правильно понимаю, вы хотите продолжить последовательно после возобновления работы пользователя. На самом деле это означает, что нет смысла делать управление окнами или буферизацией.

На мой взгляд, достаточно простого использования вложения concatMap.

const pause$ = fromEvent(pauseButton, "click").pipe(
 mapTo(false),
);

const resume$ = fromEvent(resumeButton, "click").pipe(
 mapTo(true),
);

const pauseResume$ = merge(pause$,resume$).pipe(
 startWith(true),
 shareReplay(1),
)

const source = of(...imageList).pipe(
 concatMap((url, i) =>
   pauseResume$.pipe(
     tap(v => console.log(`should resume ${v}`)),
     filter(v => v), // Only resume if true
     take(1),
     concatMap(() =>
       from(fetch(url)).pipe(
         delay(1000), // Simulate slow request
         mapTo(i) // just for logging which request we just completed
       )
     )
   )
 )
);
source.subscribe(x => console.log(x));

Это приостановит запуск новый запрос , пока резюме $ не отправит новое значение. Я полагаю, что это именно то, что вам нужно, исходя из вашего сценария.

Я не был уверен, хотите ли вы, чтобы третий сценарий выполнялся в вашем сценарии, пока пользователь приостановил его. Я предполагаю, что вы делаете, но если нет, вы можете использовать другой concatMap для pauseResume $ с фильтром после запроса.

stackblitz

1 голос
/ 20 февраля 2020

Вот моя попытка:

const urlArr = Array.from({ length: 10 }, (_, idx) => 'url/' + idx);
let idx = 0;

const urlEmitter = new Subject();
const url$ = urlEmitter.asObservable();
const stopEmitter = new Subject();
const stopValues$ = stopEmitter.asObservable();

const start$ = fromEvent(start, 'click');
start$.pipe(take(1)).subscribe(() => (stopEmitter.next(false), urlEmitter.next(urlArr[idx++]))); // Start emitting valeus

const stopSubscription = fromEvent(stop, 'click').pipe(mapTo(true)).subscribe(stopEmitter);

const shouldContinue$ = stopValues$.pipe(map(shouldStop => !shouldStop));

const subsequentStartClicks$ = start$.pipe(
  skip(1), // Skip the first time the `start` button is clicked
  observeOn(asyncScheduler), // Make sure it emits after the buffer has been initialized
  tap(() => stopEmitter.next(false)), // shouldContinue$ will emit `true`
);

const downloadedUrls$ = url$.pipe(
  mergeMap(url => of(url).pipe(delay(idx * 500))), // Simulate a file downloading
  combineLatest(shouldContinue$), // Make sure it acts according to `shouldContinue$`
  filter(([_, shouldContinue]) => shouldContinue),
  map(([v]) => v),
  tap((v) => console.warn(v)), // Debugging purposes...

  // Because of `combineLatest`
  // If you click `start` and wait some time, then you click `stop`
  // then you click again `start`, you might get the last value added to the array
  // this is because `shouldContinue$` emitted a new value
  // So you want to make sure you won't get the same value multiple times
  distinctUntilChanged(), 

  tap(() => urlEmitter.next(urlArr[idx++])),

  bufferToggle(
    start$,
    () => stopValues$.pipe(filter(v => !!v)),
  )
);

merge(
  subsequentStartClicks$.pipe(mapTo(false)), // Might not be interested in click events 
  downloadedUrls$
)
  .pipe(filter(v => !!v))
  .subscribe(console.log);

Я был вдохновлен диаграммой bufferToggle .

Моя идея состояла в том, чтобы следовать тому же подходу, с исключение: значения должны передаваться только тогда, когда поток start$ излучается, и должны останавливаться, когда stop$ сделал.

----X--X----------------------------------> urls$

-Y----------------------------------------> start$

-----------Z------------------------------> end$


-----------[X, X]-------------------------------> urls$

Каждый раз, когда нажимается кнопка stop, значение true вставляется в поток stopValues$. shouldContinue$ определяет, должен ли поток url$ продолжать выдавать значения или нет, в зависимости от stopValues$.

StackBlitz .

...