RxJS: Пакетные запросы и обмен ответами - PullRequest
0 голосов
/ 11 сентября 2018

Давайте представим, что у меня есть функция fetchUser, которая принимает в качестве параметра userId и возвращает наблюдаемого для пользователя .

Поскольку я часто вызываю этот метод, я хочу batch идентификаторов для выполнения одного запроса с несколькими идентификаторами вместо этого!

Здесь начались мои неприятности ...

Я не могу найти решение для этого, не разделяя наблюдаемое между различными вызовами fetchUser.

import { Subject, from } from "rxjs"
import { bufferTime, mergeMap, map, toArray, filter, take, share } from "rxjs/operators"

const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
    map((userId) => ({ id: userId, name: "George" })),
    toArray(),
)

const userToFetch$ = new Subject<string>()

const fetchedUser$ = userToFetch$.pipe(
    bufferTime(1000),
    mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
    share(),
)

const fetchUser = (userId: string) => {
    const observable = fetchedUser$.pipe(
        map((users) => users.find((user) => user.id === userId)),
        filter((user) => !!user),
        take(1),
    )
    userToFetch$.next(userId)
    return observable
}

Но это ужасно, и у него много проблем:

  • Если я откажусь от заметки, возвращаемой fetchUser до того, как таймер bufferTime закончится, это не помешает извлечению пользователя.
  • Если я отписываюсь от всех наблюдаемых, возвращенных fetchUser до завершения выборки, запрос не отменяется.
  • Обработка ошибок более сложна
  • и т.д.

В целом: я не знаю, как решить проблемы, требующие совместного использования ресурсов с использованием RxJS. Трудно найти продвинутый пример RxJS.

Ответы [ 4 ]

0 голосов
/ 07 августа 2019

То, что у вас есть, хорошо, но как и все RxJS, но дьявол кроется в деталях.

Проблемы

  1. switchMap ing
        mergeMap((userIds) => functionThatSimulateAFetch(userIds)),

Здесь вы впервые ошибетесь.Используя здесь карту слияния, вы лишаете возможности сообщать appart «поток запросов» из «потока, возвращенного одним запросом»:

  • Вы почти не можете отказаться от подпискииндивидуальный запрос (отменить его)
  • Вы делаете невозможным обработку ошибок
  • Он падает, если ваша внутренняя наблюдаемая излучает более одного раза.

Скореевсе, что вы хотите - это испускать отдельные BatchEvent с, через обычные map (производя наблюдаемые из наблюдаемых), и switchMap / mergeMap те после фильтрации.

Побочные эффекты при создании наблюдаемой и излучающей перед подпиской
    userToFetch$.next(userId)
    return observable

Не делайте этого.Наблюдаемое само по себе ничего не делает.Это «план» для последовательности действий, которые происходят , когда вы подписываетесь на него.Сделав это, вы создадите пакетное действие только для создания, наблюдаемого, но вы облажаетесь, если вы получаете несколько или отложенные подписки.

Скорее, вы хотите создать наблюдаемое из defer, которое излучает вuserToFetch$ на каждую подписку.

Даже тогда вы захотите подписаться на свою наблюдаемую до , отправляющую на userToFetch: если вы не подписаны, ваша наблюдаемая не слушаетпредмет, и событие будет потеряно.Вы можете сделать это в виде, похожем на отложенное.

Решение

Короткое и не сильно отличающееся от вашего кода, но структурируйте его следующим образом.

const BUFFER_TIME = 1000;

type BatchEvent = { keys: Set<string>, values: Observable<Users> };

/** The incomming keys */
const keySubject = new Subject<string>();

const requests: Observable<{ keys: Set<string>, values: Observable<Users> }> =
  this.keySubject.asObservable().pipe(
    bufferTime(BUFFER_TIME),
    map(keys => this.fetchBatch(keys)),
    share(),
  );

/** Returns a single User from an ID. Batches the request */
function get(userId: string): Observable<User> {
  console.log("Creating observable for:", userId);
  // The money observable. See "defer":
  // triggers a new subject event on subscription
  const observable = new Observable<BatchEvent>(observer => {
    this.requests.subscribe(observer);
    // Emit *after* the subscription
    this.keySubject.next(userId);
  });
  return observable.pipe(
    first(v => v.keys.has(userId)),
    // There is only 1 item, so any *Map will do here
    switchMap(v => v.values),
    map(v => v[userId]),
  );
}

function fetchBatch(args: string[]): BatchEvent {
  const keys = new Set(args); // Do not batch duplicates
  const values = this.userService.get(Array.from(keys)).pipe(
    share(),
  );
  return { keys, values };
}

Этоделает именно то, что вы просили, в том числе:

  • Ошибки передаются получателям пакетного вызова, но никто другой
  • Если все отписываются от пакета, наблюдаемое отменяется
  • Если все отписываются от пакета до того, как запрос будет даже запущен, он никогда не запускается
  • Наблюдаемое ведет себя как HttpClient: подписывается на наблюдаемые события нового (пакетного) запросадля данных.Абоненты могут свободно shareReplay или что-то еще, хотя.Так что никаких сюрпризов там нет.

Вот рабочая строблитц Угловая демонстрация: https://stackblitz.com/edit/angular-rxjs-batch-request

В частности, обратите внимание на поведение, когда вы «переключаете» дисплей: вы заметите, чтоповторная подписка на существующие наблюдаемые вызовет запуск новых пакетных запросов, и эти запросы будут отменены (или сразу не запущены), если вы переключите их достаточно быстро.

Вариант использования

В нашем проектемы используем это для угловых таблиц, где каждая строка должна отдельно выбирать дополнительные данные для рендеринга.Это позволяет нам:

  • разбивать все запросы на «одну страницу», не требуя специальных знаний о разбиении на страницы
  • Потенциально извлекать сразу несколько страниц, если пользователь быстро разбивает на страницы
  • повторно использовать существующие результаты, даже если размер страницы изменяется

Ограничения

Я бы не стал добавлять в это чанкинг или ограничение скорости.Поскольку наблюдаемый источник - тупой bufferTime, у вас возникают проблемы:

  • "Чанкинг" произойдет до дедупликации.Поэтому, если у вас есть 100 запросов на один идентификатор пользователя, вы в конечном итоге вызовете несколько запросов только с одним элементом
  • Если вы ограничите скорость, вы не сможете проверить свою очередь.Таким образом, вы можете получить очень длинную очередь, содержащую несколько одинаковых запросов.

Хотя это пессимистическая точка зрения.Исправить это означало бы полное заполнение с помощью механизма очереди / пакета с сохранением состояния, который на порядок сложнее.

0 голосов
/ 12 сентября 2018

Я думаю, @ Бигги прав.

Это способ, которым я понимаю проблему и чего вы хотите достичь

  1. В вашем приложении есть разные места, где вы хотите получить пользователей
  2. Вы делаетене хотите запускать запрос на выборку все время, скорее вы хотите их буферизовать и отправлять через определенный интервал времени, скажем, 1 секунда
  3. Вы хотите отменить определенный буфер и избежать этого для этого 1второй интервал: выполняется запрос на выборку пользователей
  4. В то же время, если кто-нибудь, назовем его Код в позиции X запросил пользователяи всего через несколько миллисекунд кто-то другой, т. е. Код в позиции Y отменяет весь пакет запросов, затем Код в позиции X имеетчтобы получить какой-то ответ, скажем, null
  5. More, вы можете захотеть запросить пользователя и затем передумать, если в пределах интервала времени буфера, и иизбегать этого пользователя быть (яотнюдь не обязательно, что это действительно то, чего вы хотите, но, похоже, это как-то вытекает из вашего вопроса

Если это все правда, то вам, вероятно, нужен какой-то механизм организации очередей, как предположил Багги.

Тогда может быть много реализаций такого механизма.

0 голосов
/ 28 сентября 2018

К вашему сведению, я попытался создать общую очередь пакетных задач, используя ответы @buggy & @picci:

import { Observable, Subject, BehaviorSubject, from, timer } from "rxjs"
import { catchError, share, mergeMap, map, filter, takeUntil, take, bufferTime, timeout, concatMap } from "rxjs/operators"

export interface Task<TInput> {
    uid: number
    input: TInput
}

interface ErroredTask<TInput> extends Task<TInput> {
    error: any
}

interface SucceededTask<TInput, TOutput> extends Task<TInput> {
    output: TOutput
}

export type FinishedTask<TInput, TOutput> = ErroredTask<TInput> | SucceededTask<TInput, TOutput>

const taskErrored = <TInput, TOutput>(
    taskFinished: FinishedTask<TInput, TOutput>,
): taskFinished is ErroredTask<TInput> => !!(taskFinished as ErroredTask<TInput>).error

type BatchedWorker<TInput, TOutput> = (tasks: Array<Task<TInput>>) => Observable<FinishedTask<TInput, TOutput>>

export const createSimpleBatchedWorker = <TInput, TOutput>(
    work: (inputs: TInput[]) => Observable<TOutput[]>,
    workTimeout: number,
): BatchedWorker<TInput, TOutput> => (
    tasks: Array<Task<TInput>>,
) => work(
    tasks.map((task) => task.input),
).pipe(
    mergeMap((outputs) => from(tasks.map((task, index) => ({
        ...task,
        output: outputs[index],
    })))),
    timeout(workTimeout),
    catchError((error) => from(tasks.map((task) => ({
        ...task,
        error,
    })))),
)

export const createBatchedTaskQueue = <TInput, TOutput>(
    worker: BatchedWorker<TInput, TOutput>,
    concurrencyLimit: number = 1,
    batchTimeout: number = 0,
    maxBatchSize: number = Number.POSITIVE_INFINITY,
) => {
    const taskSubject = new Subject<Task<TInput>>()
    const cancelTaskSubject = new BehaviorSubject<Set<number>>(new Set())
    const cancelTask = (task: Task<TInput>) => {
        const cancelledUids = cancelTaskSubject.getValue()
        const newCancelledUids = new Set(cancelledUids)
        newCancelledUids.add(task.uid)
        cancelTaskSubject.next(newCancelledUids)
    }
    const output$: Observable<FinishedTask<TInput, TOutput>> = taskSubject.pipe(
        bufferTime(batchTimeout, undefined, maxBatchSize),
        map((tasks) => {
          const cancelledUids = cancelTaskSubject.getValue()
          return tasks.filter((task) => !cancelledUids.has(task.uid))
        }),
        filter((tasks) => tasks.length > 0),
        mergeMap(
            (tasks) => worker(tasks).pipe(
                takeUntil(cancelTaskSubject.pipe(
                    filter((uids) => {
                        for (const task of tasks) {
                            if (!uids.has(task.uid)) {
                                return false
                            }
                        }
                        return true
                    }),
                )),
            ),
            undefined,
            concurrencyLimit,
        ),
        share(),
    )
    let nextUid = 0
    return (input$: Observable<TInput>): Observable<TOutput> => input$.pipe(
        concatMap((input) => new Observable<TOutput>((observer) => {
            const task = {
                uid: nextUid++,
                input,
            }
            const subscription = output$.pipe(
                filter((taskFinished) => taskFinished.uid === task.uid),
                take(1),
                map((taskFinished) => {
                    if (taskErrored(taskFinished)) {
                        throw taskFinished.error
                    }
                    return taskFinished.output
                }),
            ).subscribe(observer)
            subscription.add(
                timer(0).subscribe(() => taskSubject.next(task)),
            )
            return () => {
                subscription.unsubscribe()
                cancelTask(task)
            }
        })),
    )
}

В нашем примере:

import { from } from "rxjs"
import { map, toArray } from "rxjs/operators"
import { createBatchedTaskQueue, createSimpleBatchedWorker } from "mmr/components/rxjs/batched-task-queue"

const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
    map((userId) => ({ id: userId, name: "George" })),
    toArray(),
)

const userFetchQueue = createBatchedTaskQueue(
    createSimpleBatchedWorker(
        functionThatSimulateAFetch,
        10000,
    ),
)

const fetchUser = (userId: string) => {
    return from(userId).pipe(
        userFetchQueue,
    )
}

Я открыт для любых предложений по улучшению

0 голосов
/ 12 сентября 2018

Я не уверен, что это лучший способ решить эту проблему (по крайней мере, для этого нужны тесты), но я постараюсь объяснить свою точку зрения.

У нас есть 2 queue: для ожидающих и для запросов функций.
result, чтобы помочь доставке ответа / ошибки подписчикам.
Какой-то работник, работающий по определенному расписанию, берет из очереди задание для выполнения запроса.

Если я откажусь от заметки, возвращенной fetchUser до таймер bufferTime завершен, он не предотвращает выборку пользователь.

Отказ от подписки fetchUser очистит request queue, а worker ничего не сделает.

Если я откажусь от всех наблюдений, возвращенных fetchUser ранее получение пакета завершено, запрос не отменяется.

Рабочий подписывайся until isNothingRemain$

const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(

  map((userId) => ({ id: userId, name: "George" })),
  toArray(),
  tap(() => console.log('API_CALL', userIds)),
  delay(200),
)

class Queue {
  queue$ = new BehaviorSubject(new Map());

  private get currentQueue() {
    return new Map(this.queue$.getValue());
  }

  add(...ids) {
    const newMap = ids.reduce((acc, id) => {
      acc.set(id, (acc.get(id) || 0) + 1);
      return acc;
    }, this.currentQueue);
    this.queue$.next(newMap);
  };

  addMap(idmap: Map<any, any>) {

    const newMap = (Array.from(idmap.keys()))
      .reduce((acc, id) => {
        acc.set(id, (acc.get(id) || 0) + idmap.get(id));
        return acc;
      }, this.currentQueue);
    this.queue$.next(newMap);
  }

  remove(...ids) {
    const newMap = ids.reduce((acc, id) => {
      acc.get(id) > 1 ? acc.set(id, acc.get(id) - 1) : acc.delete(id);
      return acc;
    }, this.currentQueue)
    this.queue$.next(newMap);
  };

  removeMap(idmap: Map<any, any>) {
    const newMap = (Array.from(idmap.keys()))
      .reduce((acc, id) => {
        acc.get(id) > idmap.get(id) ? acc.set(id, acc.get(id) - idmap.get(id)) : acc.delete(id);
        return acc;
      }, this.currentQueue)
    this.queue$.next(newMap);
  };

  has(id) {
    return this.queue$.getValue().has(id);
  }

  asObservable() {
    return this.queue$.asObservable();
  }
}

class Result {
  result$ = new BehaviorSubject({ ids: new Map(), isError: null, value: null });
  select(id) {
    return this.result$.pipe(
      filter(({ ids }) => ids.has(id)),
      switchMap(({ isError, value }) => isError ? throwError(value) : of(value.find(x => x.id === id)))
    )
  }
  add({ isError, value, ids }) {
    this.result$.next({ ids, isError, value });
  }

  clear(){
    this.result$.next({ ids: new Map(), isError: null, value: null });
  }
}

const result = new Result();
const queueToSend = new Queue();
const queuePending = new Queue();
const doRequest = new Subject();

const fetchUser = (id: string) => {
  return Observable.create(observer => {
    queueToSend.add(id);
    doRequest.next();

    const subscription = result
      .select(id)
      .pipe(take(1))
      .subscribe(observer);

    // cleanup queue after got response or unsubscribe
    return () => {
      (queueToSend.has(id) ? queueToSend : queuePending).remove(id);
      subscription.unsubscribe();
    }
  })
}


// some kind of worker that take task from queue and send requests
doRequest.asObservable().pipe(
  auditTime(1000),
  // clear outdated results
  tap(()=>result.clear()),
  withLatestFrom(queueToSend.asObservable()),
  map(([_, queue]) => queue),
  filter(ids => !!ids.size),
  mergeMap(ids => {
    // abort the request if it have no subscribers
    const isNothingRemain$ = combineLatest(queueToSend.asObservable(), queuePending.asObservable()).pipe(
      map(([queueToSendIds, queuePendingIds]) => Array.from(ids.keys()).some(k => queueToSendIds.has(k) || queuePendingIds.has(k))),
      filter(hasSameKey => !hasSameKey)
    )

    // prevent to request the same ids if previous requst is not complete
    queueToSend.removeMap(ids);
    queuePending.addMap(ids);
    return functionThatSimulateAFetch(Array.from(ids.keys())).pipe(
      map(res => ({ isErorr: false, value: res, ids })),
      takeUntil(isNothingRemain$),
      catchError(error => of({ isError: true, value: error, ids }))
    )
  }),
).subscribe(res => result.add(res))




fetchUser('1').subscribe(console.log);

const subs = fetchUser('2').subscribe(console.log);
subs.unsubscribe();

fetchUser('3').subscribe(console.log);



setTimeout(() => {
  const subs1 = fetchUser('10').subscribe(console.log);
  subs1.unsubscribe();

  const subs2 = fetchUser('11').subscribe(console.log);
  subs2.unsubscribe();
}, 2000)


setTimeout(() => {
  const subs1 = fetchUser('20').subscribe(console.log);
  subs1.unsubscribe();

  const subs21 = fetchUser('20').subscribe(console.log);
  const subs22 = fetchUser('20').subscribe(console.log);
}, 4000)


// API_CALL
// ["1", "3"]
// {id: "1", name: "George"}
// {id: "3", name: "George"}
// API_CALL
// ["20"]
// {id: "20", name: "George"}
// {id: "20", name: "George"}

Пример стекаблица

...