Angular rxjs Observables найти первый живой сервер - PullRequest
2 голосов
/ 10 мая 2019

Я хочу реализовать что-то вроде того, что позволяет плагин promiseAny, но для Observables, где первым, кто получит ответ, является «победитель».Специально для Angular http.get() s для разных возможных серверов.

У меня есть следующее, однако он возвращает результат для всех объединенных Observables, которые представляют http.get для каждого сервера.Один из серверов жив, а тот, которого я знаю, мертв.Тем не менее, значение, возвращаемое из подписки, имеет 2 значения, и результаты не указывают на то, что одно работает, а другое - нет.Подписка (http.get()), похоже, не запускается.Как мне написать это?

Это для Angular 7.2.

import {merge} from 'rxjs';
import {take} from 'rxjs/operators';

async getActiveServer(servers: string[]): Promise<string> {
    return new Promise(async (resolve, reject) => {
        merge(this.buildObservables(servers)).pipe(take(1))
            .subscribe((value) => {
                // .flatMap((value) => {
                console.log(`observable - value: ${JSON.stringify(value, null, 2)}`);
                if (Array.isArray(value) && value.length > 0) {
                    resolve(this.findServer(value[0]));
                } else {
                    reject('cannot find server as response is not an array - it is: ${value}');
                }
            }, (error) => {
                console.log(`observable - error: ${error}`);
            });
    });

private async buildObservables(servers: string[]): Promise<any> {
    const observablesBatch = [];
    for (const server of servers) {
        observablesBatch.push(this.http.get<any>(server + '/health/alive?server=' + server));
    }
    return observablesBatch;
}

findServer() имеет дело с отдельной проблемой, которую я получаю, возвращая структуру вложенных объектов.Этот метод просматривает структуру, чтобы найти URL-адрес, и извлекает нужную информацию из строки.

Значение, напечатанное из console.log( observable - value: `, выглядит примерно так:

observable - value: [
  {
    "_isScalar": false,
    "source": {
      "_isScalar": false,
      "source": {
        "_isScalar": false,
        "source": {
          "_isScalar": true,
          "value": {
            "url": "http://localhost:8080/health/alive?server=http://localhost:8080",
            "body": null,
            "reportProgress": false,
            "withCredentials": false,
            "responseType": "json",
            "method": "GET",
            "headers": {
              "normalizedNames": {},
              "lazyUpdate": null,
              "headers": {}
            },
            "params": {
              "updates": null,
              "cloneFrom": null,
              "encoder": {},
              "map": null
            },
            "urlWithParams": "http://localhost:8080/health/alive?server=http://localhost:8080"
          }
        },
        "operator": {
          "concurrent": 1
        }
      },
      "operator": {}
    },
    "operator": {}
  },
  {
    "_isScalar": false,
    "source": {
      "_isScalar": false,
      "source": {
        "_isScalar": false,
        "source": {
          "_isScalar": true,
          "value": {
            "url": "https://remoteServer.net//health/alive?server=https://remoteServer.net/",
            "body": null,
            "reportProgress": false,
            "withCredentials": false,
            "responseType": "json",
            "method": "GET",
            "headers": {
              "normalizedNames": {},
              "lazyUpdate": null,
              "headers": {}
            },
            "params": {
              "updates": null,
              "cloneFrom": null,
              "encoder": {},
              "map": null
            },
            "urlWithParams": "https://remoteserver.net//health/alive?server=https://remoteserver.net/"
          }
        },
        "operator": {
          "concurrent": 1
        }
      },
      "operator": {}
    },
    "operator": {}
  }
]

AsВы можете видеть, что я попробовал flatMap(), но это не сработало за то время, которое я ему выделил.

Как мне написать это?


1.Я дал ответ о том, что сработало, основываясь на ответе @ Phix.

2.Редактирует - решение с использованием race, как предложено @Adrian Brand.

Мне нравится (если оно работает), но оно не работает.У меня нет времени, чтобы решить это, и согласно сообщению Адриана, это должно сработать.Синтаксическая ошибка, которую я получаю: Property subscribe does not exist on MonoTypeOperatorFunction<any>.

ЭТО НЕ РАБОТАЕТ, НО ЭТО БЫ ХОРОШО, ЕСЛИ ЭТО СДЕЛАНО (хотя нужно добавить фильтрацию или подобное).

async getActiveServer(servers: string[]): Promise<string> {
    return new Promise(async (resolve, reject) => {
    race(...this.buildObservables(servers))
        .subscribe(r => {
            console.log('Found a live server:', r);
            resolve(r.alive);
        }, () => console.warn('Nothing is alive.'));
    });
}

Ответы [ 3 ]

1 голос
/ 10 мая 2019

Я думаю, что это примерно то же направление, что и вы. Сначала я подумал, что race может работать, но это вернет все, что разрешится первым, включая ошибки.

import { merge, of, race } from 'rxjs'; 
import { first, filter, catchError } from 'rxjs/operators';

// Combine all http requests
merge(...buildObservables())
  .pipe(
    // Only let through those that have a good response code
    filter((server: any) => server.response < 400),
    // Just take the first one
    first(),
  )
  .subscribe(r => console.log('Found a live server:', r), () => console.warn('Nothing is alive.'))

// Builds mock server responses
function buildObservables() {
  const responses = [];
  for(let i = 0; i < 4; i++) {
    responses.push(mockResponse(`http://sub${i}.example.com/api/v1`));
  }

  return responses;
}

function mockResponse(url: string) {
  const timeout = Math.round(Math.random() * 3000)
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (Math.random() < .5) {
        resolve({
          server: url,
          response: 200
        })
      } else {
        reject({
          server: url,
          response: 500
        })
      }
    }, timeout)
  })
}

Stackblitz

1 голос
/ 10 мая 2019

Вот что работает, основываясь на ответе @ Phix (обязательно их проголосуйте):

async getActiveServer(servers: string[]): Promise<string> {
        return new Promise(async (resolve, reject) => {
            merge(...this.buildObservables(servers))
                .pipe(
                    filter((server: any) => server.hasOwnProperty('alive')),
                    first()
                )
                .subscribe(r => {
                    console.log('Found a live server:', r);
                    resolve(r.alive);
                }, () => console.warn('Nothing is alive.'));
        });
    }

    private buildObservables(servers: string[]): Observable<any>[] {
        const observablesBatch: Observable<any>[] = [];
        for (const server of servers) {
            observablesBatch.push(this.http.get<any>(server + '/health/alive?server=' + server));
        }
        return observablesBatch;
    }

Обратите внимание, что конечная точка alive на моем сервере возвращается:

async alive(server): Promise<any> {
    return Promise.resolve({alive: server});
}
1 голос
/ 10 мая 2019

Вы пробовали использовать расу вместо?

https://www.learnrxjs.io/operators/combination/race.html

...