В Rx JS должен ли вызываться complete в наблюдаемом, который не генерирует никаких элементов? - PullRequest
0 голосов
/ 13 июля 2020

В моем последнем посте я пытался буферизовать ожидающие HTTP-запросы с помощью Rx JS. Я думал, что bufferCount - это то, что мне нужно, но я обнаружил, что мои элементы были меньше размера буфера, он просто ждал, а я не хотел этого.

Теперь у меня есть новая схема, использующая take. Кажется, он делает то, что я хочу, за исключением случаев, когда в моем результирующем наблюдаемом нет элементов (слева), полное никогда не вызывается.

Например, у меня есть что-то вроде следующего ..

      const pendingRequests = this.store$.select(mySelects.getPendingRequests).pipe(
      // FlatMap turns the observable of a single Requests[] to observable of Requests
      flatMap(x => x),

      // Only get requests unprocessed
      filter(x => x.processedState === ProcessedState.unprocessed),

      // Batches of batchSize in each emit
      take(3),
    );
    
    let requestsSent = false;
    pendingRequests.subscribe(nextRequest => {           
      requestsSent = true;
      this.sendRequest(nextEvent);           
     },
      error => {            
        this.logger.error(`${this.moduleName}.sendRequest:  Error ${error}`);
      },
      () => {         
        // ****  This is not called if pendingRequests is empty ****
        if (requestsSent ) {              
          this.store$.dispatch(myActions.continuePolling());              
        } else {
          this.store$.dispatch(myActions.stopPolling());              
        }
      }
    );

Таким образом, take(3) получит следующие 3 ожидающих запроса и отправит их (), где я также отправляю действие, чтобы установить обработанное состояние на not ProcessedState.pending, чтобы мы не получили их в следующем опросе)

Все это работает нормально, но когда pendingRequests в конечном итоге ничего не возвращает (пуст), блок completed отмечен знаком ****. не называется. Я бы подумал, что это будет вызвано сразу.

Я не уверен, имеет ли это значение, поскольку, поскольку я не отправляю действие для продолжения опроса, опрос останавливается.

Но меня больше всего беспокоит то, что если pendingRequests не заполнено, нужно ли мне отказаться от подписки, чтобы предотвратить утечки? Я предполагаю, что если complete вызывается , мне не нужно отказываться от подписки?

Обновление

Чтобы pendingReguests всегда выполнялся, я немного другой подход. Вместо того, чтобы использовать операторы rx для «фильтрации», я просто получаю каждый раз весь список и только take(1) в нем. Я всегда буду получать список, даже если он пуст, поэтому pendingReguests будет завершаться каждый раз.

ie

const pendingRequests = this.store$.select(mySelects.getPendingRequests).pipe(take(1))

И тогда я могу просто фильтровать и загружать внутри наблюдаемый ..

 pendingRequests.subscribe(nextRequest => {
      let requestToSend = nextRequest.filter(x => x.processedState === ProcessedState.unprocessed);
      const totalPendingCount = requestToSend.length;
      requestToSend = requestToSend slice(0, this.batchSize);          
      for (const nextRequest of requestToSend) {                  
          this.sendRequest(nextRequest);            
      }

      if (totalPendingCount > this.batchSize) {
        this.store$.dispatch(myActions.continuePolling());            
      }

В моем тестировании до сих пор я всегда получал complete для запуска.

Кроме того, имея 2 действия (startPolling и continuePolling ) Я могу поместить задержку только в continuePolling, поэтому при первом запуске опроса (например, приложение только что вернулось в сеть после того, как оно вышло за пределы диапазона сети), мы отправляем сразу же и задерживаем, только если у нас больше, чем размер пакета

Возможно, это не 100% способ "rxy", но, похоже, пока работает. Здесь какие-то проблемы?

Ответы [ 2 ]

0 голосов
/ 13 июля 2020

Я сомневаюсь, что pendingRequests когда-нибудь завершится сам по себе. Store, по крайней мере, в ngrx, это BehaviorSubject. Таким образом, всякий раз, когда вы выполняете store.select() или store.pipe(select()), вы просто добавляете другого подписчика во внутренний список подписчиков, поддерживаемый BehaviorSubject.

BehaviorSubject расширяет Subject , и вот что происходит, когда Subject подписывается на :

this.observers.push(subscriber);

В вашем случае вы используете take(3). После 3 значений take выдаст полное уведомление, поэтому ваш обратный вызов complete должен быть вызван. И поскольку вся цепочка на самом деле является подписчиком BehaviorSubject, она удалит себя из списка подписчиков при уведомлениях complete.

Я предполагаю, что если вызывается полная, мне не нужно отказаться от подписки

Вот что происходит, когда подписчик (например, TakeSubscriber) завершает :

  protected _complete(): void {
    this.destination.complete();
    this.unsubscribe();
  }

Итак, есть не нужно отказываться от подписки, если уведомление complete / error уже пришло.

0 голосов
/ 13 июля 2020

Я бы заменил take на toArray и немного буферизовал logi c впоследствии.

Вот как может выглядеть ваш код. Я добавил delay logi c, который, я думаю, был предложен вашим предыдущим сообщением, и предоставил комментарии для описания каждой добавленной строки

// implementation of the chunk function used below
// https://www.w3resource.com/javascript-exercises/fundamental/javascript-fundamental-exercise-265.php
   const chunk = (arr, size) =>
      Array.from({ length: Math.ceil(arr.length / size) }, (v, i) =>
        arr.slice(i * size, i * size + size)
      );

  const pendingRequests = this.store$.select(mySelects.getPendingRequests).pipe(
  // FlatMap turns the observable of a single Requests[] to observable of Requests
  flatMap(x => x),

  // Only get requests unprocessed
  filter(x => x.processedState === ProcessedState.unprocessed),

  // Read all the requests and store them in an array
  toArray(),
  // Split the array in chunks of the specified size, in this case 3
  map(arr => chunk(arr, 3)), // the implementation of chunk is provided above
  // Create a stream of chunks
  concatMap((chunks) => from(chunks)),
  // make sure each chunk is emitted after a certain delay, e.g. 2 sec
  concatMap((chunk) => of(chunk).pipe(delay(2000))),
  // mergeMap to turn an array into a stream
  mergeMap((val) => val)
);

let requestsSent = false;
pendingRequests.subscribe(nextRequest => {           
  requestsSent = true;
  this.sendRequest(nextEvent);           
 },
  error => {            
    this.logger.error(`${this.moduleName}.sendRequest:  Error ${error}`);
  },
  () => {         
    // ****  THIS NOW SHOULD BE CALLED ****
    if (requestsSent ) {              
      this.store$.dispatch(myActions.continuePolling());              
    } else {
      this.store$.dispatch(myActions.stopPolling());              
    }
  }
);
...