Как разделить поток на несколько потоков в последовательности - PullRequest
1 голос
/ 15 апреля 2019

Использование RxJS 6,

У меня есть поток произвольных данных:

[in] -> a, b, c, d, e, f, g, h, i, ....

Я хочу разбить его на фиксированное количество N потоков в чередующемся порядке (в данном случае 3 выходных потока):

[out] -> stream1 -> a, d, g
      -> stream2 -> b, e, h
      -> stream3 -> c, f, i

или более просто:

a => stream1
b => stream2
c => stream3
d => stream1
e => stream2
f => stream3
g => stream1
h => stream2
i => stream3

Кто-нибудь знает, как я могу это сделать?

1 Ответ

0 голосов
/ 15 апреля 2019

Вы можете перебрать N и разделить ваш поток на два на каждой итерации, используя partition:

import { from, merge } from 'rxjs';
import { partition, map } from 'rxjs/operators';

const source = from(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']);

function split(source, n) {
  const streams = [];
  let toSplit = source;
  for (let k = n; k > 0; --k) {
    const [stream, rest] = toSplit.pipe(
      partition((_, i) => i % k === 0)
    );
    streams.push(stream);
    toSplit = rest;
  }
  return streams;
}

const obs = split(source, 3);

const subscribe = merge(
  obs[0].pipe(map(val => `1: ${val}`)),
  obs[1].pipe(map(val => `2: ${val}`)),
  obs[2].pipe(map(val => `3: ${val}`)),
).subscribe(val => console.log(val));

См. Этот пример StackBlitz.

...