внедрить систему очередей с RxJs - PullRequest
0 голосов
/ 03 мая 2018

Мы реализовали систему очередей в Javascript.

Потребители регистрируют элементы и внутренне помещаются в очередь (массив, хранящийся в хранилище сеансов).

Временной интервал используется для снятия с очереди нескольких элементов и отправки этих элементов на сервер.

Как мы можем реализовать это, используя Rxjs и потоки?

Ответы [ 2 ]

0 голосов
/ 07 мая 2018

Требуемый оператор: bufferTime

import { Subject } from 'rxjs/Subject';
import { bufferTime } from 'rxjs/operators';

const queue$ = new Subject();
const interval = 2000; // 2 seconds

queue$.pipe(bufferTime(interval))
    .subscribe(queueArray => {
        // Send to backend
        console.log(queueArray);
    });

queue$.next('hello');
queue$.next('world');

// After 2 seconds logs: ['hello', 'world']
0 голосов
/ 03 мая 2018

const queue = new Rx.Subject();

const queueProcessing = queue
  .mergeMap(i => Rx.Observable.of(i)
              .do(val => console.log('processing item: ' + val))
              .delay(2000) /* stub processing time*/
           , 2) /* concurrency */
  .subscribe();

queue.next('asdf');
queue.next('as');
queue.next('zxvc');
queue.next('`1`');
queue.next('zx');
queue.next('234');
queue.next('5');
queue.next('645');
queue.next('asdf');
queue.next('3');
queue.next('2');
queue.next('34');
queue.next('asdf');
queue.next('5');
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.1/Rx.js"></script>

.mergeMap содержит демонстрационную функцию «обработки», которая просто выдает значение после задержки. Каждый раз, когда вы хотите добавить элемент в очередь, вы .next() добавляете его в тему.

...