rx js - буфер испускаемых значений до тех пор, пока другой наблюдаемый не испускает, затем испускает как обычно - PullRequest
1 голос
/ 14 июля 2020

Я создаю объект регистратора, который асинхронно получает IP-адрес, а затем регистрирует все значения с этим IP-адресом. Он должен начать сбор журналов сразу после создания экземпляра, но выдать их только после получения IP-адреса; и после этого он должен излучать как обычно.

Вот мой класс:


class LoggerService {
  constructor() {
    let thisIp;
    const getIp = Observable.create(function(observer) {
      // doing it with a timeout to emulate bad network
      setTimeout(() => {
        fetch('https://api.ipify.org?format=json').then(response => response.json()).then(response => {
          thisIp = response.ip;
          console.log('fetched IP: ', thisIp);
          observer.next(response.ip);
          observer.complete();
        });
      }, 5000)
    });
    // this is where I plan to buffer logs until IP is obtained
    this.logStream = new Subject().pipe(buffer(getIp));
    // for starters - just log to the console with the IP address
    this.logStream.subscribe((value) => console.log(thisIp, value));

  }

  emit = (message) => this.logStream.next(message);
}

Но он работает не так, как мне нужно; он выводит все буферизованные значения в виде массива, но перестает выдавать их после получения IP:


const logger = new LoggerService();


setInterval(() => {
  logger.emit('Hey ' + Math.random())
}, 1000);

// I get five messages and that's it

Как мне заставить его выдавать мои значения даже после буферизации?

Ответы [ 2 ]

1 голос
/ 14 июля 2020

Вам не нужно «буферизовать» значения как таковые, но вы можете создать поток, который зависит от asyn c ipAddress$, поэтому значение не будет отправлено, пока IP-адрес не будет испускается. combineLatest подойдет для этой цели.

Давайте дадим LoggerService поток сообщений с именем message$ и простой log() метод, который проталкивает предоставленную строку через этот поток .

Мы можем создать поток messagesWithIpAddresses$, который использует combineLatest для создания наблюдаемого, который испускает предоставленное сообщение вместе с ipAddress$, но только после того, как оба фактически выдали значение.

export class LoggerService {
  private messages$ = new Subject<string>();

  public log(message: string): void {
    this.messages$.next(message);
  }

  constructor(service: GenericService) { 

    const messagesWithIpAddresses$ = this.messages$.pipe(
      mergeMap(message => combineLatest(service.ipAddress$, of(message)))
    );

    messagesWithIpAddresses$.subscribe(
      ([ip, message]) => {
        // actual logging logic would go here...
        console.log(`[${ip}] ${message}`);
      }
    );
  }
}

Поскольку of(message) будет выдавать немедленно, мы просто будем ждать ipAddress$. Но, если значение уже было отправлено, оно также будет выполнено немедленно.

Проверьте это работает StackBlitz

образец трансляции экрана

0 голосов
/ 14 июля 2020

Обновление : ответ ниже не сработает. После просмотра документации буфера ваш логгер Subject () по-прежнему не будет выдавать больше сообщений, потому что он срабатывает только при срабатывании getIp. В вашем коде getIp срабатывает только один раз после HTTP-запроса.

Я думаю, нам нужно больше деталей о том, чего вы хотите достичь, чтобы предложить правильный конвейер rx js.

Я бы удалил эту строку

observer.complete();  // remove it

Это сигнализирует подписчику о завершении потока, поэтому вы больше не получаете никаких сообщений.

...