RxJS Выполняем несколько асинхронных операций и ждем их завершения в порядке выполнения - PullRequest
0 голосов
/ 08 июля 2019
/// ----------w1-----w2-----------------

/// -----------------------C1---------C2

Существует буфер записи, в который можно вводить команду после завершения вызова обратного вызова, связанного с fn.

Требуется, чтобы команда записи выполнялась последовательно, например, для. если команда w1 выполнена, и после этого w2 , w2 должна быть выполнена после завершения w1 . w1 и w2 должны вызвать соответствующую функцию завершения C1 , C2 после завершения.

Он реализован с использованием RxJ, работает в соответствии со спецификацией, но есть некоторые болевые точки. Вместо использования обратного вызова я просто использую await, чтобы сделать поток кода линейным.

function atomicWrite$(time, cmd) {
  return timer(time).pipe(
    take(1),
    tap(() => console.log('done cmd: ', cmd))
  );
}


const commandList = [];
let signalNextWrite$ = new ReplaySubject(1);

async function monsterProcess$(index, time) {

  console.log('entered loop', index);
  await signalNextWrite$.pipe(filter(idx => index === idx), take(1)).toPromise();

  console.log('processing start:', index, time);

  await of(commandList[index]).pipe(
    concatMap(cmd => atomicWrite$(time, cmd))
  ).toPromise();

  console.log('processing end:', index, time);

  signalNextWrite$.next(index + 1);

  return EMPTY.toPromise();
}

function writeIssue(command, time) {
  commandList.push(command);
  return monsterProcess$(commandList.length - 1, time);
}

signalNextWrite$.next(0);


setTimeout(async () => {
  const res = await writeIssue('windows xp2', 5000);
  console.log('completed windows');
});

setTimeout(async () => {
  const res = await writeIssue('mac siera', 1000);
  console.log('completed mac');
});

Вопрос вместо signalNextWrite$, может ли быть какое-то время как мьютекс, который будет ждать, пока блокировка разблокирована, а затем продолжит. Здесь индекс (idx) команды, выданной в качестве фильтра, который блокирует выполнение другой выданной команды записи.

Хотел отойти от индекса массива в качестве логики блокировки и вместо этого использовать какую-то другую логику для ожидания завершения текущей команды и оповещения о завершении.

Также хотите избежать этого кода также:

signalNextWrite$.next(0);

Stackblitz напр. https://stackblitz.com/edit/typescript-r45p2d

...