Наблюдаемый от сервера опроса массива - PullRequest
0 голосов
/ 15 октября 2018

Я пытаюсь создать Observable из массива элементов, каждый из которых регулярно проверяет наличие обновлений сервера, а затем отправляет действие, когда получает желаемый результат для каждого элемента.

Ответ ниже полезен, но не совсем то, что я ищу

Это другой подход, который я пробовал:

export function handleProcessingScenes(action$,store) {
  return action$.ofType(REQUEST_ALL_SCENES_BY_LOCATION_FULFILLED)
    .switchMap(({ scenesByLocation }) => Observable.from(scenesByLocation))
    .filter(scene => scene.scenePanoTask)
    .mergeMap(scene => updateScene(scene))
}

function updateScene(scene) {
  return Observable.interval(3000)
    .flatMap(() => requestSceneUpdates(scene.id))
    .takeWhile(res =>  res.payload.status < 4)
    .timeout(600000, Observable.throw(new Error('Timeout')))

}

Функция APIвозвращает Observable

export function requestSceneUpdates(sceneId){

  console.log('requestSceneUpdate')

  const request = fetch(`${API_URL}/scene/task/${sceneId}/update`, {
    method: 'get',
    credentials: 'include',
    crossDomain: true,
  }).then(res => res.json())

  return Observable.fromPromise(request)
}

Однако это вызывает функцию 'requestSceneUpdate' только один раз.

Я хочу вызывать эту функцию каждые 3 секунды для каждой сцены в scenesByLocation.Затем я хочу вернуть действие, когда каждый из них закончен.

Эпос, который у меня есть для одной сцены:

export function  sceneProcessingUpdate(action$) {
  return action$.ofType(REQUEST_SCENE_PROCESSING_TASK_SUCCESS)
    .switchMap(({task}) =>
      Observable.timer(0, 30000).takeUntil(action$.ofType( REQUEST_SCENE_PROCESSING_TASK_UPDATE_SUCCESS))
        .exhaustMap(() =>
          requestSceneUpdates(task.id)
            .map((res) => {
              if (res.error) 
                return { type: REQUEST_SCENE_PROCESSING_TASK_UPDATE_FAILED, message: res.message }
              else if(res.payload.status === 4) 
                return { type: REQUEST_SCENE_PROCESSING_TASK_UPDATE_SUCCESS, task:  res.payload }
              else 
                return requestSceneProcessingTaskMessage(res.payload)
            })
            .catch(err => { return { type: REQUEST_SCENE_PROCESSING_TASK_UPDATE_FAILED, message: err } })
        )
    )
}

Ответы [ 2 ]

0 голосов
/ 15 октября 2018

В конце концов это сработало, @Andrew исправил первую часть.

    export function handleProcessingScenes(action$,store) {
  return action$.ofType(REQUEST_ALL_SCENES_BY_LOCATION_FULFILLED)
    .switchMap(({ scenesByLocation }) => Observable.from(scenesByLocation))
    .filter(scene => scene.scenePanoTask)
    .flatMap(scene => {
      return Observable.timer(0, 5000).takeUntil(action$.ofType( REQUEST_SCENE_PROCESSING_TASK_UPDATE_SUCCESS))
        .exhaustMap(() =>
          requestSceneUpdates(scene.id)
            .map((res) => {

              if (res.error) 
                return { type: REQUEST_SCENE_PROCESSING_TASK_UPDATE_FAILED, message: res.message }
              else if(res.payload.status === 4) 
                return { type: REQUEST_SCENE_PROCESSING_TASK_UPDATE_SUCCESS, task:  res.payload }
              else 
                return requestSceneProcessingTaskMessage(res.payload)
            })
            .catch(err => { return { type: REQUEST_SCENE_PROCESSING_TASK_UPDATE_FAILED, message: err } })
        )
    })
}
0 голосов
/ 15 октября 2018

Я думаю, тебе нужно что-то подобное.Идея состоит в том, чтобы повторить обновление сцены в случае сбоя через 3 секунды и не использовать таймер.

export function handleProcessingScenes(action$) {
  return action$.ofType(REQUEST_ALL_SCENES_BY_LOCATION_FULFILLED)
    .switchMap(({ scenesByLocation }) => Observable.from(scenesByLocation))
    .filter(scene => scene.scenePanoTask)
    .mergeMap(scene => updateScene(scene));
}

function updateScene(scene) {
  return requestSceneUpdates(scene.id)
    .map((res) => {
      if (res.error)
        throw res.error;
      else if (res.payload.status === 4)
        return { type: REQUEST_SCENE_PROCESSING_TASK_UPDATE_SUCCESS, task: res.payload }
      else
        return requestSceneProcessingTaskMessage(res.payload)
    })
    .retryWhen(errors => errors.delay(3000));
}
...