Angular / RxJS: Как сгенерировать последовательность задержанных значений? - PullRequest
0 голосов
/ 04 июня 2018

У меня есть массив сообщений.Каждые N секунды мне нужно выдавать сообщение из Observable.

Я пробовал следующий код.Первое сообщение было отправлено с задержкой, и я получил его в подписчике, но генерация других сообщений в последовательности была остановлена.Если я удаляю вызов метода .delay(), я получаю необходимую последовательность, подписчик реагирует на каждое сообщение, но без промежутка времени между сообщениями.Как я могу это исправить?

import { Observable } from 'rxjs/Observable';
import { fromArray } from 'rxjs/observable/fromArray';
import 'rxjs/add/operator/delay';


@Injectable({
  providedIn: 'root'
})
export class MessageService {

  constructor() { }

  getIncomingMessagesStream(): Observable<string> {
    const messageTimeout = 2000;

    const messages = ['Hi there!', 'How are you?', 'That is awesome :)'];

    return fromArray(messages)
      .delay(messageTimeout);
  }
}

Пример подписчика:

messageService.getIncomingMessagesStream()
    .subscribe(message => console.log('New incoming message ', message) );

1 Ответ

0 голосов
/ 04 июня 2018

Вам нужно заставить цепочку испускать следующий элемент только после того, как предыдущий элемент был задержан:

import { from, of } from 'rxjs';
import { delay, concatMap } from 'rxjs/operators';

...

return from(messages).pipe(
  concatMap(item => of(item).pipe(delay(messageTimeout))),
);

Кстати, вы комбинируете стили RxJS <5.5 и RxJS> = 5.5 (конвейерные и прототипные стили)операторов).Лучше придерживаться только с RxJS 5.5 и избегать import 'rxjs/add/operator/concatMap';.См. https://github.com/ReactiveX/rxjs/blob/master/doc/pipeable-operators.md

Июль 2019: обновлено для RxJS 6.

См. Демонстрационный пример: https://stackblitz.com/edit/rxjs5-hibr4m?file=index.ts

...