Издайте один раз, а затем каждую секунду после этого, пока значение не изменится - PullRequest
0 голосов
/ 31 декабря 2018

у меня это работает.При получении сообщения сокета значение отправляется немедленно и каждую секунду после этого (с увеличением возраста), пока сокет не будет получен снова.

Однако я хочу, чтобы он излучал каждую секунду независимо от того, был ли получен сокет.Так что он начинал излучать каждую секунду, но когда сокет получал, свойства менялись на новые, и они передавались каждую секунду.

Не могу понять, что делать.

updated: Observable<TargetDevice>;

this.updated = socketService.onMessage.pipe(
  filter(
    message =>
      message.messageType === SocketIoMessageType.Device &&
      message.data.id === this.id
  ),
  map((message: SocketIoMessage) => <Device>message.data),
  tap(d => this.setProps(d)),
  switchMap(d =>
    timer(0, 1000).pipe(
      tap(tick => (this.age = d.age + tick)),
      map(() => this)
    )
  )
);

Ответы [ 3 ]

0 голосов
/ 31 декабря 2018

Вы хотите использовать combineLatest и startWith для достижения желаемого поведения:

combineLatest(
    socketService.onMessage.pipe(
        startWith(DEFAULT_MESSAGE)
    ),
    timer(0, 1000)
).pipe(
    //...
)
0 голосов
/ 31 декабря 2018

Одним из возможных решений вашей проблемы является создание beforeSocketMessage$ потока и ограничение его оператором takeUntil.

const beforeSocketMessage$ = interval(1000).pipe(mapTo('beforeSocketMessage'), takeUntil(socketService.onMessage));


const message$ = socketService.onMessage.pipe(
    filter(/* you filter*/),
    map((message: SocketIoMessage) => <Device>message.data),
    tap(d => this.setProps(d)),
    switchMap((data) => interval(0, 1000).pipe(mapTo(data))),
    tap(tick => (this.age = d.age + tick)),
    mapTo(this)
)

this.update = merge(beforeSocketMessage$, message$);

Упрощенная версия на stackblitz.com: https://stackblitz.com/edit/rxjs-jwg2cb?devtoolsheight=60

0 голосов
/ 31 декабря 2018

Я думаю, вы хотите что-то подобное.Вы были правы с switchMap.Самое интересное в этом то, что он отписывается от источника, наблюдаемого после того, как было выпущено следующее значение.

socketService.onMessage.pipe(
    switchMap(value => interval(1000).map(() => value))
);

Проверьте это на RxJs Playground здесь

...