Каков наилучший способ реализовать опрашивающее устройство с тайм-аутом в качестве реактивного потока. - PullRequest
0 голосов
/ 08 мая 2018

Каков наилучший способ для моделирования опроса с тайм-аутом, когда определенное условие вызывает ранний выход как «реактивные потоки»?

, например

Если бы у меня была наблюдаемая, которая производила бы убывающую последовательность натуральных чисел каждую секунду

9,8,7,6,5,4,3,2,1,0

Каков наилучший способ написать потребителя, который принимает последнее одиночное событие через 5 секунд ИЛИ событие '0', если оно было создано раньше, чем время ожидания.

Это мой код в его нынешнем виде: (пример на Java)

    int intialValue = 10;

    AtomicInteger counter = new AtomicInteger(intialValue);
    Integer val = Observable.interval(1, TimeUnit.SECONDS)
                            .map(tick -> counter.decrementAndGet())
                            .takeUntil(it -> it == 0)
                            .takeUntil(Observable.timer(5, TimeUnit.SECONDS))
                            .lastElement()
                            .blockingGet();

    System.out.println(val);

если initialValue = 10, я ожидаю, что 6 напечатает. если initialValue = 2, я ожидаю, что 0 напечатает до истечения 5-секундного таймаута.

Мне интересно, есть ли лучший способ сделать это.

1 Ответ

0 голосов
/ 09 мая 2018

Я не думаю, что есть действительно лучший способ, чем вы. Вы должны иметь следующее:

  • Интервал излучения (interval)
  • Агрегатор для уменьшения и сохранения последнего значения (scan)
  • Условие завершения на значение (takeWhile)
  • Условие завершения по времени (takeUntil(timer(...)))
  • Получить последнее значение при завершении (last)

Каждый из них представлен оператором. Вы не можете сделать много, чтобы обойти это. Я использовал несколько разных операторов (scan для агрегации и takeWhile для завершения по значению), но это то же количество операторов.

const { interval, timer } = rxjs;
const { scan, takeWhile, takeUntil, last, tap } = rxjs.operators;

function poll(start) {
  console.log('start', start);
  interval(1000).pipe(
    scan((x) => x - 1, start),
    takeWhile((x) => x >= 0),
    takeUntil(timer(5000)),
    tap((x) => { console.log('tap', x); }),
    last()
  ).subscribe(
    (x) => { console.log('next', x); },
    (e) => { console.log('error', e); },
    () => { console.log('complete'); }
  );
}

poll(10);
setTimeout(() => { poll(2); }, 6000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.1.0/rxjs.umd.min.js"></script>

Мне не ясно, как вы ожидаете, что он будет функционировать на границах. В вашем примере вы всегда уменьшаете перед излучением, поэтому если ваше начальное значение равно 10, то вы излучаете 9, 8, 7, 6 (4 значения). Если вы хотите начать с 10, то. Вы могли бы сделать scan(..., start + 1), но это привело бы к 7, потому что таймер в takeUntil(...) совпадает с интервалом источника, так что 6 будет исключено. Если вы хотите выдать 5 значений, вы можете сделать takeUntil(timer(5001)). Кроме того, если вы не хотите ждать секунды, чтобы выдать первое значение, вы можете поставить startWith(start) сразу после scan(...). Или вы можете сделать timer(0, 1000) с scan(..., start + 1) вместо исходного интервала.

Также обратите внимание, что завершение по значению (takeWhile) не завершится, пока не будет получено недопустимое значение (-1). Таким образом, он будет продолжаться в течение секунды после получения значения завершения (0). Похоже, что большинство операторов завершения работают таким образом, что, если они заканчиваются на каком-то значении, они не пропустят другие.

Вы могли бы сделать take(5) вместо takeUntil(timer(5000)), потому что вы знаете, что оно срабатывает с совпадающим интервалом, если это работает для вашего сценария. Это также обойдёт проблему исключения последнего значения из-за выравнивания таймеров.

...