Как приостановить обработку concatMap эффекта, но не отменить полную очередь? - PullRequest
0 голосов
/ 13 февраля 2019

Я использую NGRX и эффекты для базовых вещей в приложении.Приложение поддерживает связь Bluetooth и постоянно обновляет / записывает различные параметры.Однако иногда необходимо приостановить это обновление.

У меня возникают проблемы с приостановкой выполнения эффекта NGRX, который использует concatMap для помещения действий в последовательную очередь.Действия в очереди все еще должны обрабатываться после того, как был подан какой-то сигнал «продолжить», или когда свойство bluetoothService.paused снова становится ложным.Может быть много действий ReadFromDevice, помещенных в очередь в concatMap.

Служба bluetooth имеет логическое свойство this.bluetoothService.pauseCommunication, однако я не знаю, как интегрировать это в Effect.Я пробовал разные (возможно глупые) вещи, но до сих пор не получилось.К сожалению, в настоящее время я не могу изменить код bluetoothService.

Я знаю, что могу отменить полный concatMap, выдав ошибку, но это не то, что мне нужно.Мне просто нужно приостановить обработку, пока логический флаг не станет ложным.

Это упрощенный пример эффекта, который я использую


@Effect()
  readParameterFromDevice$: Observable<Action> = this.actions$.pipe(
    ofType<ReadFromDevice>(CommunicationActionTypes.ReadFromDevice),
    map(action => action.payload),
    concatMap(async request => {
        try {
          const result = await this.bluetoothService.readFromDevice(
            request
          );
          return new ReadSuccess({
             result
          });
        } catch (error) {
          return new ReadError({
             result
          });
        }
    })
  );

Было бы здорово, если бы кто-нибудь мог указать мне на правонаправление.

1 Ответ

0 голосов
/ 13 февраля 2019

Я думаю, что вы ищете оператор buffer.

Буферизует исходные наблюдаемые значения до тех пор, пока не будет получено закрытиеNotifier.

См. документы .

import { fromEvent, interval } from 'rxjs';
import { buffer } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const interval = interval(1000);
const buffered = interval.pipe(buffer(clicks));
buffered.subscribe(x => console.log(x));

Или оператор bufferToggle.

Буферизует исходные наблюдаемые значения, начиная с выброса из отверстий и заканчивая, когда выходной сигнал closeSelector создает.

См. документы

import { fromEvent, interval, empty } from 'rxjs';
import { bufferToggle } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const openings = interval(1000);
const buffered = clicks.pipe(bufferToggle(openings, i =>
  i % 2 ? interval(500) : empty()
));
buffered.subscribe(x => console.log(x));
...