Как мне собрать данные из различных запросов, используя rx js в angular не безобразным способом? - PullRequest
1 голос
/ 20 апреля 2020

Мой вариант использования настолько прост, насколько я мог бы подумать, что он вездесущ, но я не могу найти ни одного учебника, который бы объяснял его прилично.

Я хочу получить различные данные в onInit () моего Модуль angular, некоторые последовательные, некоторые параллельные. После того, как все это сделано, мне нужно выполнить синхронные операции с собранными данными. Я хочу, чтобы мой код читался наполовину.

Предположим, у меня есть следующее (все наблюдаемые возвращаются один раз, без фактических потоков):

  • 1) Наблюдаемый параметр маршрутизации для получения идентификатора от
  • 2) Запрос на получение моей основной сущности с использованием этого идентификатора (последовательный)
  • 3) Три запроса к различным конечным точкам для получения дополнительных данных с использованием идентификатора, полученного из второй точки (последовательно для пункт 2, но может выполняться одновременно друг с другом)

Я просто хочу получить все эти данные, упаковать их в одну наблюдаемую, подписаться на нее, чтобы я мог выполнить свою фактическую операцию в обратном вызове этих подписок .

То, что я ищу: простая структура псевдокода / кода, которая сообщает мне, какие операторы в какой последовательности ставить. Бонусные баллы за «почему».

$ Obs1.pipe (mergemap (...))

Дополнительный бонус за читабельность, если Rx js в все совместимо с чистым кодом. Да, я разочарован своей неспособностью это сделать и полным отсутствием примеров по этому вопросу.

Ответы [ 3 ]

2 голосов
/ 20 апреля 2020
requestRest(data1) {
  // parallel request logic encapsulated in a function
  return forkJoin(
    this.req2(data1),
    this.req3(data1),
    this.req4(data1)
  );
}

ngOnInit() {
  // store id observable in a reusable variable (just to illustrate concept)
  const id$ = this.route.params.pipe(
    map(p => p['id'])
  );

  id$.pipe(
    // switch off id into req1
    switchMap(id => this.req1(id)),
    // switch off req1 into rest of data
    switchMap(data1 => this.requestRest(data1).pipe(
      // inner map to combine data1 with rest of data
      map(restOfData => [data1, ...restOfData])))
  ).subscribe(
     ([d1, d2, d3, d4]) => console.log("got all the data")
  )
}

rx js - мощная библиотека для построения конвейеров для обработки потоков данных. Вы можете строить и составлять разные конвейеры, как считаете нужным. Существует некоторая кривая обучения, но как только вы ее преодолеете, вы сможете обрабатывать любой поток данных, о котором только можете подумать. Каждое внешнее событие (http-запросы, веб-наборы, входные данные форм, тайм-ауты, интервалы, щелчки, перемещения мыши и т. Д. c) представляет собой поток данных, и это делает rx js очень ценным инструментом для изучения.

Это своего рода тривиальный пример, поскольку rx js streams go, как последовательное и параллельное выполнение и преобразования, являются основой. Когда вы рассматриваете такие вещи, как обработка ошибок, повторная логика c, кеширование и другие, то есть то, что нужно большинству производственных приложений, оно становится еще более мощным.

switchMap само по себе является одним из самых мощных инструменты, предлагаемые rx js, так как он обрабатывает отмену logi c, если параметр id должен был измениться во время выполнения ваших запросов, гарантируя, что вы получите результаты с правильным идентификатором, и не рискуете ошибками в условиях гонки между запросами.

0 голосов
/ 20 апреля 2020

Пожалуйста, проверьте это: https://stackblitz.com/edit/ewi2gy

Я знаю, что кривая обучения крутая, но иногда лучше немного потрудиться, чтобы научиться правильно, получая ответ, работающий точно так же, как вы хочу, не поможет в долгосрочной перспективе.

import { forkJoin, of, timer, Observable } from "rxjs";
import Axios from "axios-observable";
import { mapTo, switchMap, map, tap, mergeMap } from "rxjs/operators";

interface Post {
  id: number;
  title: string;
}

interface Comment {
  id: number;
  body: string;
  postId: number;
}

interface Profile {
  name: string;
}

interface FullPost {
  id: number;
  title: string;
  comments: Comment[];
  profile: string;
}

// this observable gives you the id, assuming from query parameter
const id = of(1);

// then start the stream with that id
const postsObservable = id
  .pipe(
    // map the id to the fetched post
    switchMap((id: number) =>
      Axios.get("https://my-json-server.typicode.com/typicode/demo/posts/" + id)
    ),
    // convert response into an object of type Post
    map(response => response.data),
    // fork your requests to get other data, simplistic example only to make the PoC
    mergeMap((post: Post) =>
      forkJoin({
        // get comments
        comments: Axios.get<Comment[]>(
          "https://my-json-server.typicode.com/typicode/demo/comments?postId=" +
            post.id
        ),
        // get something else ...
        profile: Axios.get<Profile>(
          "https://my-json-server.typicode.com/typicode/demo/profile"
        ),
        // give back also the post as Observable, even more simplistic example
        post: of(post)
      })
    ),
    map(valueObject => {
      // work out the aggregation you got here
      return {
        id: valueObject.post.id,
        title: valueObject.post.title,
        comments: valueObject.comments.data,
        profile: valueObject.profile.data.name,
       } as FullPost;
    })
  )
  .subscribe(fullPost =>  console.log(fullPost));
0 голосов
/ 20 апреля 2020

Здесь может быть один подход:

ngOnInit() {
  this.router.queryParamMap
    .pipe(
      map(getYourId),
      exhaustMap(
        entityId => this.http.get(/* ... */).pipe(
          exhaustMap( // `entityId` also available here
            results => forkJoin([
              of(results),
              this.http.get(''), // Request 1
              this.http.get(''), // Request 2
              this.http.get(''), // Request 3
            ])
          )
        )
      ),
    )
    .subscribe()
}

exhaustMap может иметь только одну активную внутреннюю наблюдаемую величину, и если поступит внешнее значение, оно будет пропущено. Все пропускается, пока текущая внутренняя наблюдаемая не завершит .

В этом случае, я бы не сказал, имеет значение, какой из exhaustMap, mergeMap, concatMap, switchMap вы используете.

Первый exhaustMap просто делает запрос, а следующий exhaustMap подпишется на внутреннюю наблюдаемую, созданную на основе данных, поступающих из первого (results).

forkJoin подпишется на предоставленные ему наблюдаемые одновременно, но вы получите массив их ответов только после того, как у каждого наблюдаемого будет испущено хотя бы один раз и завершено .

...