Rx JS Оператор для составления наблюдаемых в состояние? - PullRequest
1 голос
/ 26 апреля 2020

В настоящее время я сталкиваюсь с довольно распространенной проблемой, когда дело доходит до асинхронного выполнения, обработки состояний и ада отступов.

Скажем, есть вызов REST, который запрашивает добавление пользовательской информации (user_info) в пользователь, уведомить своих контактов об этом изменении и вернуть введенную или измененную информацию пользователя в ответ. Но пользователь object знает все идентификаторы своих user_info объектов только через отношение 1 к n.

Последовательность вызовов:

request -> saveUserInfo -> updateUser -> notifyContacts -> response

saveToDb, updateUser и notifyContacts - это все функции, которые выполняют асинхронный побочный эффект и не могут быть просто скомпонованы, поскольку все они имеют разные входы и выходы и зависят от порядка выполнения.

Здесь приведено слабое представление заголовков функций

function saveUserInfo(payload): Promise<UserInfo[]> { /*...*/ }
function updateUser(user_id, user_infos): Promise<User> { /*...*/ }
function sendNotification(user, user_infos): Promise<void> { /*...*/ }

Чтобы написать этот обработчик запросов, я в настоящее время много работаю с mergeMap, чтобы подписаться на внутренние наблюдаемые, которые выполняли текущее асинхронное действие. Это выглядело примерно так:

function handleRequest(payload, user_id) {
  return saveUserInfo(payload).pipe(
    mergeMap(user_infos =>
      from(updateUser(user_id, user_infos)).pipe(
        mergeMap(user =>
          from(notifyContacts(user, user_infos)).pipe(map(() => user_infos))
        )
      )
    )
  )
}

Я не был удовлетворен этим решением, потому что в будущем будет добавлено больше логики c, и я знаю, что в какой-то момент это может стать громоздким.

Итак, я просмотрел документацию по Rx JS и, к сожалению, не нашел умного способа соединить эти асинхронные вызовы вместе гораздо более приятным способом. Самая большая проблема в том, что я не могу полностью сформулировать проблему здесь в нескольких словах, что могло бы привести меня к написанию этого вопроса.

Кто-нибудь знает лучшее решение этого вопроса? Я уверен, что должно быть что-то!

Желательно без вспомогательных функций и в чистом Rx JS.

Ответы [ 3 ]

1 голос
/ 26 апреля 2020

Когда вы используете оператор отображения высокого порядка (например, mergeMap и его друзья) и если предоставленный обратный вызов возвращает обещание , вам не нужно делать () => from(promise). mergeMap автоматически сделает это за вас.

Имея это в виду, я бы изменил это:

function handleRequest(payload, user_id) {
  return saveUserInfo(payload).pipe(
    mergeMap(user_infos =>
      from(updateUser(user_id, user_infos)).pipe(
        mergeMap(user =>
          from(notifyContacts(user, user_infos)).pipe(map(() => user_infos))
        )
      )
    )
  )
}

на это

function handleRequest(payload, user_id) {
  return from(saveUserInfo(payload)) // `saveUserInfo(payload)` returns a promise
    .pipe(
      mergeMap(user_infos => forkJoin(of({ user_infos }), updateUser(user_id, user_infos)),
      mergeMap(([acc, user]) => forkJoin(of({ ...acc, otherPropsHere: true }), notifyContacts(user, user_infos))),
      map(([acc]) => acc['user_infos'])
    ),
  )
}

forkJoin будет выдавать массив после того, как все его предоставленные ObservableInput s отправят хотя бы одно значение и будут завершены.

Как видите, первый элемент возвращенного массива это аккумулятор.

1 голос
/ 26 апреля 2020

Так как я не смог найти ничего полезного для inte rnet, я предложил функцию более высокого порядка, чтобы предоставить мне конвейерный оператор, который мог бы потенциально решить мою проблему, я назвал его sourceMap.

Он должен подписаться на внутреннюю наблюдаемую через mergeMap и объединить результат этого объекта в объект состояния ключа.

const sourceMap = <S, T extends Record<string, any>>(
  predicate: (acc: T) => Promise<S> | Observable<S>,
  key?: keyof T
) => {
  return mergeMap((acc: T) =>
    from(predicate(acc)).pipe(map(res => (key ? { ...acc, [key]: res } : acc)))
  )
}

Реализация выглядит следующим образом:

function handleRequest(payload, user_id): Observable<UserInfo[]> {
  const state$ = of({ user_infos: [] as UserInfo[], user: {} as User })

 return state$.pipe(
    sourceMap(() => saveUserInfo(user_id, payload), 'user_info'),
    sourceMap(({ user_info }) => updateUser(user_id, user_info), 'user'),
    sourceMap(({ user_info, user }) => notifyContacts(user, user_info)),
    pluck('user_info')
  )
}

Still ищем лучшее решение для этого!

0 голосов
/ 26 апреля 2020

Вы также можете сделать это так:

function saveUserInfo(payload: Payload): Promise<UserInfo[]>;
function updateUser(user_id: string, user_infos: UserInfo[]): Promise<User>;
function sendNotification(user: User, user_infos: UserInfo[]): Promise<void>;

function handleRequest(payload: Payload, user_id: string): Observable<UserInfo[]> {
  return from(saveUserInfo(payload)).pipe(
    switchMap(user_infos => updateUser(user_id, user_infos).then(user => ({ user, user_infos }))),
    switchMap(({ user, user_infos }) => sendNotification(user, user_infos).then(() => user_infos))
  );
}
...