Оператор обтекания в RxJS, чтобы его можно было применить к материализованному потоку - PullRequest
0 голосов
/ 23 декабря 2018

Я ищу способ отследить значение 'route' среди потоковых операторов.У меня есть материализованный поток с дополнительными метаданными об объекте уведомления (например, свойство valueId).Это определение будет выглядеть примерно так:

const x = stream.pipe(
  materialize(),
  map(x => Object.assign(x, {valueId: randomInt()}))
);

Теперь мне нужно обернуть операторы, которые применяются к x.Допустим, мне нужно использовать map(x => x * 2), но я не могу сделать это так:

x.pipe(dematerialize(), map(x => x * 2))

Потому что я потеряю свои метаданные.Как мне сделать функцию обтекания, которая будет применяться к любому оператору и все равно сохранит мои дополнительные метаданные?

x.pipe(wrap(map(x => x * 2)))

Я думал о чем-то вроде этого:

function wrap<T, R>(
  operator: OperatorFunction<T, R>
): (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
  return source =>
    source.pipe(
      switchMap(x =>
        of(x).pipe(
          dematerialize(),
          operator,
          materialize(),
          map(z => Object.assign(z, { valueId: x.valueId }))
        )
      )
    );
}

Но он генерируетподдельные полные сообщения от of().Образец: https://stackblitz.com/edit/rxjs-fhv54p

Ответы [ 3 ]

0 голосов
/ 24 декабря 2018

Пока что мне пришла в голову идея, что «сохраняет» данные, используя состояние функции обтекания.Мне не очень нравится его «побочный эффект», хотя это самая безопасная реализация, которую я нашел до сих пор.

export function wrap<T, R>(
  operator: OperatorFunction<T, R>,
) => (source: Observable<TaggedValue<T>>) => Observable<TaggedValue<R>> {
  return source => {
    let metadata: { stepId: number; streamId: number; valueId: number };

    return source.pipe(
      tap(
        ({ valueId, stepId, streamId }) =>
          (metadata = { valueId, streamId, stepId: stepId + 1 }),
      ),
      dematerialize(),
      operator,
      materialize(),
      map(x => Object.assign(x, metadata, { timestamp: Date.now() })),
    );
  };
}

Но это не работает, потому что метаданные переопределяются случайным образом.

0 голосов
/ 04 января 2019

Проблема в вашем подходе с использованием of заключается в том, что полные уведомления of материализуются и передаются в виде value наблюдателю source.

Попробуйте:

function wrap<T, R>(op: OperatorFunction<T, R>):
  (source: Observable<any>) => Observable<any> {
  return source => {
    return source.pipe(
      switchMap(x => of(x)
        .pipe(
          dematerialize(),
          op,
          map(y => ({value: y, uuid: x.uuid}))
        )
      ),
      materialize(),
      map((x: any) => Object.assign(x, {
        value: x.value ? x.value.value : undefined,
        uuid: x.value ? x.value.uuid: undefined
      })),
    )
  }
}

Демо: https://stackblitz.com/edit/rxjs-gxj7zl

0 голосов
/ 24 декабря 2018

Вы можете сделать что-то вроде этого

const stream$ = stream$.pipe(map((data=> ({...data, x: data.x + 1}))))

Или переместить его в функцию переноса

const mapProp = (propName, fn) => stream$.pipe(map((data=> ({...data, [propName]: fn(data[propName])}))))

//then
const stream$ = stream$.pipe(mapProp('x', x => x+1 ))

, если хотите использовать это для чего-то другого, кроме map

const mapProp = (propName, fn) => stream$.pipe(
    mergeMap(data =>
      of(data[propName])
        .pipe(
           fn,
           map(newPropValue => ({ ...data, propName: newPropValue })
        )
    )
)

//Usage 
const stream$ = stream$.pipe(mapProp('x', map(x => x+1)))
...