Как отложить передачу события с помощью rxpy / rxjs? - PullRequest
0 голосов
/ 17 мая 2018

У меня есть два потока событий.Один из индуктивного контура, другой - IP-камера.Автомобили будут проезжать через петлю, а затем ударить камеру.Я хочу объединить их, если события находятся в пределах N миллисекунд друг от друга (машина всегда будет сначала зацикливаться), но я также хочу, чтобы все несопоставленные события из каждого потока (любое из аппаратных средств могло выйти из строя) были объединены в один поток.Примерно так:

           ---> (only unmatched a's, None)
         /                                  \
stream_a (loop)                              \
         \                                    \
            --> (a, b) ---------------------------> (Maybe a, Maybe b)
         /                                    /
stream_b  (camera)                           /
         \                                  /
            --> (None, only unmatched b's)

Теперь, конечно, я могу взломать свой путь, выполнив хороший анти-паттерн ole Subject:

unmatched_a = Subject()

def noop():
    pass

pending_as = [[]]

def handle_unmatched(a):
    if a in pending_as[0]:
        pending_as[0].remove(a)
        print("unmatched a!")
        unmatched_a.on_next((a, None))

def handle_a(a):
    pending_as[0].append(a)
    t = threading.Timer(some_timeout, handle_unmatched)
    t.start()
    return a

def handle_b(b):
    if len(pending_as[0]):
        a = pending_as[0].pop(0)
        return (a, b)

    else:
        print("unmatched b!")
        return (None, b)

stream_a.map(handle_a).subscribe(noop)
stream_b.map(handle_b).merge(unmatched_a).subscribe(print)

Мало того, что это довольно хакерский, но хотя яя не наблюдал этого, я почти уверен, что при проверке очереди ожидания используется threading.Timer.Учитывая множество операторов rx, я почти уверен, что некоторая их комбинация позволит вам сделать это без использования Subject, но я не могу понять это.Как этого добиться?

Редактировать

Хотя по организационным и эксплуатационным причинам я бы предпочел придерживаться Python, я возьму ответ JavaScript rxjs и либо перенесу его, либо даже перепишувесь скрипт в узле.

Ответы [ 2 ]

0 голосов
/ 18 мая 2018

Я разработал стратегию, отличную от Cartant, и явно гораздо менее элегантную, что может дать вам несколько иной результат.Я прошу прощения, если я не понял вопроса и если мой ответ окажется бесполезным.

Моя стратегия основана на использовании switchMap на a $ и затем bufferTime на b $ .

Этот код генерируется при каждом timeInterval и генерирует объект, который содержит последний a полученный и массив b s, представляющие b s, полученные в течение интервала времени.

a$.pipe(
    switchMap(a => {
        return b$.pipe(
            bufferTime(timeInterval),
            mergeMap(arrayOfB => of({a, arrayOfB})),
        )
    })
)

Если arrayOfB пусто, это означает, что последние a в несоответствии.

Если arrayOfB имеет только один элемент, это означает, что последнему a соответствует b массива.

ЕслиarrayOfB имеет более одного элемента, это означает, что последнему a соответствует первый b массива, в то время как все остальные b s

Теперь нужно избегать выброса одного и того же a более одного раза, и здесь код становится немного грязным.

В итоге,TКод может выглядеть следующим образом

const a$ = new Subject();
const b$ = new Subject();

setTimeout(() => a$.next("a1"), 0);
setTimeout(() => b$.next("b1"), 0);
setTimeout(() => a$.next("a2"), 100);
setTimeout(() => b$.next("b2"), 125);
setTimeout(() => a$.next("a3"), 200);
setTimeout(() => b$.next("b3"), 275);
setTimeout(() => a$.next("a4"), 400);
setTimeout(() => b$.next("b4"), 425);
setTimeout(() => b$.next("b4.1"), 435);
setTimeout(() => a$.next("a5"), 500);
setTimeout(() => b$.next("b5"), 575);
setTimeout(() => b$.next("b6"), 700);
setTimeout(() => b$.next("b6.1"), 701);
setTimeout(() => b$.next("b6.2"), 702);
setTimeout(() => a$.next("a6"), 800);


setTimeout(() => a$.complete(), 1000);
setTimeout(() => b$.complete(), 1000);


let currentA;

a$.pipe(
    switchMap(a => {
        currentA = a;
        return b$.pipe(
            bufferTime(50),
            mergeMap(arrayOfB => {
                let aVal = currentA ? currentA : null;
                if (arrayOfB.length === 0) {
                    const ret = of({a: aVal, b: null})
                    currentA = null;
                    return ret;
                }
                if (arrayOfB.length === 1) {
                    const ret = of({a: aVal, b: arrayOfB[0]})
                    currentA = null;
                    return ret;
                }
                const ret = from(arrayOfB)
                            .pipe(
                                map((b, _indexB) => {
                                    aVal = _indexB > 0 ? null : aVal;
                                    return {a: aVal, b}
                                })
                            )
                currentA = null;
                return ret;
            }),
            filter(data => data.a !== null || data.b !== null)
        )
    })
)
.subscribe(console.log);
0 голосов
/ 18 мая 2018

Вы должны быть в состоянии решить проблему, используя auditTime и buffer.Как это:

function matchWithinTime(a$, b$, N) {
  const merged$ = Rx.Observable.merge(a$, b$);
  // Use auditTime to compose a closing notifier for the buffer.
  const audited$ = merged$.auditTime(N);
  // Buffer emissions within an audit and filter out empty buffers.
  return merged$
    .buffer(audited$)
    .filter(x => x.length > 0);
}

const a$ = new Rx.Subject();
const b$ = new Rx.Subject();
matchWithinTime(a$, b$, 50).subscribe(x => console.log(JSON.stringify(x)));

setTimeout(() => a$.next("a"), 0);
setTimeout(() => b$.next("b"), 0);
setTimeout(() => a$.next("a"), 100);
setTimeout(() => b$.next("b"), 125);
setTimeout(() => a$.next("a"), 200);
setTimeout(() => b$.next("b"), 275);
setTimeout(() => a$.next("a"), 400);
setTimeout(() => b$.next("b"), 425);
setTimeout(() => a$.next("a"), 500);
setTimeout(() => b$.next("b"), 575);
setTimeout(() => b$.next("b"), 700);
setTimeout(() => b$.next("a"), 800);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

Если возможно, что за значениями b близко следуют значения a, и вы не хотите, чтобы они сопоставлялись, вы можетеиспользуйте более конкретный аудит, например:

const audited$ = merged$.audit(x => x === "a" ?
  // If an `a` was received, audit upcoming values for `N` milliseconds.
  Rx.Observable.timer(N) :
  // If a `b` was received, don't audit the upcoming values.
  Rx.Observable.of(0, Rx.Scheduler.asap)
);
...