В RxJs, как мне выполнить последовательность внутри карты слияния, возвращая массив результатов по порядку? - PullRequest
0 голосов
/ 22 октября 2019

У меня есть функция, которая может разбить сообщение на несколько частей. Мне нужно, чтобы эти сообщения были опубликованы для того, чтобы моя функция была доступна. Однако я не хочу, чтобы Observable блокировал другие входящие сообщения. Мое решение было бы в некоторой комбинации оператора concat внутри карты слияния, но я не могу заставить его работать должным образом

Я не уверен, что могу создать диаграмму, но вот моя попытка:

-1-2------3|->
--4--5--6|->
desired output:
[4,5,6]
[1,2,3]

Мне нужен запрос 1 для выполнения до 2 до 3 и 4 до 5 и до 6. В английском я думаю, что у меня будет наблюдаемая наблюдаемых, и я хочу, чтобы она отображалась в наблюдаемые потоки, а затем отображалась в стандартный массивдля каждого наблюдаемого выходного потока. Я просто не уверен, как именно это сделать. Я долго возился с кодом, пытаясь осмыслить то, что я только что сказал, и вот моя лучшая попытка:

    interface SendInfo {
        message: discord.Message
        content: string
        options?: discord.MessageOptions
    }
    export const sendMessage$: Subject<SendInfo> = new Subject();

    const regex = /[\s\S]{1,1980}(?:\n|$)/g;
    export const sentMessages$ = sendMessage$.pipe(
        mergeMap(
            (input: SendInfo):
            Observable<(discord.Message | discord.Message[] | null)[]> => {
                const chunks: string[] = input.content.match(regex) || [];
                const superObservable: Observable<Observable<discord.Message | discord.Message[] | null>> = concat(chunks.map(
                    (chunk: string):
                    Observable<discord.Message | discord.Message[] | null> => {
                        const bound = input.message.channel.send.bind(
                            undefined,
                            chunk,
                            input.options,
                        );
                        return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
                            bound,
                        );
                    }
                ));

                return superObservable.pipe(
                    mergeMap(e => e),
                    toArray(),
                );
            }
        ),
        tap((e): void => Utils.logger.fatal(e)),
        share(),
    );

Мой вывод:

[2019-10-21T17:24:15.322] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg1' } ]
[2019-10-21T17:24:15.324] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg2' } ]
[2019-10-21T17:24:15.325] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg3' } ]

Я чувствуюкак будто я близок к решению, но я не могу понять, как именно объединить это в один массив. Я также не знаю, является ли это функционально правильным или нет.

Ответы [ 2 ]

1 голос
/ 22 октября 2019

Я рассмотрел вопрос сохранения порядка ранее в этом вопросе RxJS: MergeMap с сохранением порядка ввода

Так что, используя мой parallelExecute, вы можете затем уменьшить значения до массива.

parallelExecute(...yourObservables).pipe(
  reduce((results, item) => [...results, item], [])
);

здесь есть функцияrallelExecute.

const { BehaviorSubject, Subject } = rxjs;
const { filter } = rxjs.operators;

const parallelExecute = (...obs$) => {
  const subjects = obs$.map(o$ => {
    const subject$ = new BehaviorSubject();
    const sub = o$.subscribe(o => { subject$.next(o); });
    return { sub: sub, obs$: subject$.pipe(filter(val => val)) };
  });
  const subject$ = new Subject();
  sub(0);
  function sub(index) {
    const current = subjects[index];
    current.obs$.subscribe(c => {
      subject$.next(c);
      current.obs$.complete();
      current.sub.unsubscribe();
      if (index < subjects.length -1) {
        sub(index + 1);
      } else {
        subject$.complete();
      }
    });
  }
  return subject$;
}
0 голосов
/ 23 октября 2019

Я выяснил, что оператор, который я искал, - это ОбъединитьАлла (), а не toАррей () после оператора конкатат. У меня есть и другая реализация с обещаниями. Теперь я верю, что оба они должны работать, но я опубликую тот, в котором я уверен больше - это обещания.

Реализация, использующая обещания:

    export const sentMessages$ = sendMessage$.pipe(
        mergeMap(
            (input: SendInfo):
            Observable<(discord.Message | null)[]> => {
                const chunks: string[] = input.content.match(regex) || [];
                const observables: Observable<(discord.Message | null)[]>[] = chunks.map(
                    (chunk: string):
                    Observable<(discord.Message | null)[]> => {
                        const bound = input.message.channel.send.bind(
                            undefined,
                            chunk,
                            input.options,
                        );
                        // eslint-disable-next-line max-len
                        return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
                            bound,
                        ).pipe(
                            // eslint-disable-next-line comma-dangle
                            map((x): (discord.Message | null)[] => [x].flatMap(
                                (t): (discord.Message | discord.Message[] | null) => t
                            ))
                        );
                    }
                );

                const promises = observables
                    .map(
                        (obs: Observable<(discord.Message | null)[]>):
                        Promise<(discord.Message | null)[]> => obs.toPromise()
                    );

                const reduced = promises
                    .reduce(async (promiseChain, currentTask):
                    Promise<(discord.Message | null)[]> => [
                        ...await promiseChain,
                        ...await currentTask,
                    ].flatMap((x): (discord.Message | null) => x));

                return from(reduced);
            }
        ),
        share(),
    );

Реализация двух чистых RxJ:

    export const sentMessages$ = sendMessage$.pipe(
        mergeMap(
            (input: SendInfo):
            Observable<(discord.Message | null)[]> => {
                const chunks: string[] = input.content.match(regex) || [];
                const observables: Observable<(discord.Message | null)[]>[] = chunks.map(
                    (chunk: string):
                    Observable<(discord.Message | null)[]> => {
                        const bound = input.message.channel.send.bind(
                            undefined,
                            chunk,
                            input.options,
                        );
                        // eslint-disable-next-line max-len
                        return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
                            bound,
                        ).pipe(
                            // eslint-disable-next-line comma-dangle
                            map((x): (discord.Message | null)[] => [x].flatMap(
                                (t): (discord.Message | discord.Message[] | null) => t
                            ))
                        );
                    }
                );

                return concat(observables).pipe(
                    combineAll(),
                    map(x => x.flatMap(t => t)),
                );
            }
        ),
        share(),
    );

Я считаю, что обещания будут работать, потому что они сведены в явную цепочку. Я не уверен, что мне не хватает нюанса с оператором concat, поэтому я не уверен на 100%, что он будет работать. Тестовый стенд, который я написал, на самом деле не работает должным образом из-за неправильного понимания того, как выполняются обещания с оператором отсрочки в RxJ, но я получаю ожидаемый порядок в соответствии с моим стендом. Я считаю, что мое недоразумение стало причиной того, что я не смог придумать эти решения легко.

[2019-10-23T06:09:13.948] [FATAL] messageWrapper.ts:109 - [
  { channel: { send: [Function] }, content: 'msg7' },
  { channel: { send: [Function] }, content: 'msg8' },
  { channel: { send: [Function] }, content: 'msg9' }
]
[2019-10-23T06:09:14.243] [FATAL] messageWrapper.ts:109 - [
  { channel: { send: [Function] }, content: 'msg4' },
  { channel: { send: [Function] }, content: 'msg5' },
  { channel: { send: [Function] }, content: 'msg6' }
]
[2019-10-23T06:09:14.640] [FATAL] messageWrapper.ts:109 - [
  { channel: { send: [Function] }, content: 'msg1' },
  { channel: { send: [Function] }, content: 'msg2' },
  { channel: { send: [Function] }, content: 'msg3' }
]
      ✓ should execute concurrently. (753ms)
...