Вызовите массив обещаний параллельно, но разрешите их по порядку, не дожидаясь разрешения других обещаний - PullRequest
0 голосов
/ 10 февраля 2019

У меня есть массив обещаний, которые я хотел бы вызывать параллельно, но разрешать синхронно.

Я сделал этот фрагмент кода для выполнения требуемой задачи, однако мне нужно было создать свой собственный объект QueryablePromise, чтобы обернуть нативный Promise, который я могу синхронно проверить, чтобы увидеть его разрешенный статус.

Есть ли лучший способ выполнить эту задачу, не требующий специального объекта?

Обратите внимание.Я не хочу использовать Promise.all, поскольку не хочу ждать разрешения всех обещаний, прежде чем обрабатывать эффекты обещаний.И я не могу использовать функции async в моей кодовой базе.

const PROMISE = Symbol('PROMISE')
const tap = fn => x => (fn(x), x)

class QueryablePromise {
  resolved = false
  rejected = false
  fulfilled = false
  constructor(fn) {
    this[PROMISE] = new Promise(fn)
      .then(tap(() => {
        this.fulfilled = true
        this.resolved = true
      }))
      .catch(x => {
        this.fulfilled = true
        this.rejected = true
        throw x
      })
  }
  then(fn) {
    this[PROMISE].then(fn)
    return this
  }
  catch(fn) {
    this[PROMISE].catch(fn)
    return this
  }
  static resolve(x) {
    return new QueryablePromise((res) => res(x))
  }
  static reject(x) {
    return new QueryablePromise((_, rej) => rej(x))
  }
}

/**
 * parallelPromiseSynchronousResolve
 *
 * Call array of promises in parallel but resolve them in order
 *
 * @param  {Array<QueryablePromise>}  promises
 * @praram {Array<fn>|fn}  array of resolver function or single resolve function
 */
function parallelPromiseSynchronousResolve(promises, resolver) {
  let lastResolvedIndex = 0
  const resolvePromises = (promise, i) => {
    promise.then(tap(x => {
      // loop through all the promises starting at the lastResolvedIndex
      for (; lastResolvedIndex < promises.length; lastResolvedIndex++) {
        // if promise at the current index isn't resolved break the loop
        if (!promises[lastResolvedIndex].resolved) {
          break
        }
        // resolve the promise with the correct resolve function
        promises[lastResolvedIndex].then(
          Array.isArray(resolver)
            ? resolver[lastResolvedIndex]
            : resolver
        )
      }
    }))
  }
  
  promises.forEach(resolvePromises)
}

const timedPromise = (delay, label) => 
  new QueryablePromise(res => 
    setTimeout(() => {
      console.log(label)
      res(label)
    }, delay)
  )

parallelPromiseSynchronousResolve([
  timedPromise(20, 'called first promise'),
  timedPromise(60, 'called second promise'),
  timedPromise(40, 'called third promise'),
], [
  x => console.log('resolved first promise'),
  x => console.log('resolved second promise'),
  x => console.log('resolved third promise'),
])
<script src="https://codepen.io/synthet1c/pen/KyQQmL.js"></script>

Ура за любую помощь.

Ответы [ 2 ]

0 голосов
/ 11 февраля 2019

Я бы порекомендовал действительно использовать Promise.all - но не на всех обещаниях сразу, а на всех обещаниях, которые вы хотите выполнить для каждого шага.Вы можете создать этот «древовидный список» обещаний с помощью reduce:

function parallelPromisesSequentialReduce(promises, reducer, initial) {
  return promises.reduce((acc, promise, i) => {
    return Promise.all([acc, promise]).then(([prev, res]) => reducer(prev, res, i));
  }, Promise.resolve(initial));
}

const timedPromise = (delay, label) => new Promise(resolve =>
  setTimeout(() => {
    console.log('fulfilled ' + label + ' promise');
    resolve(label);
  }, delay)
);

parallelPromisesSequentialReduce([
  timedPromise(20, 'first'),
  timedPromise(60, 'second'),
  timedPromise(40, 'third'),
], (acc, res) => {
  console.log('combining ' + res + ' promise with previous result (' + acc + ')');
  acc.push(res);
  return acc;
}, []).then(res => {
  console.log('final result', res);
}, console.error);
0 голосов
/ 11 февраля 2019

Используя цикл for await...of, вы можете сделать это довольно хорошо, если у вас уже есть массив обещаний:

const delay = ms => new Promise(resolve => { setTimeout(resolve, ms); });
const range = (length, mapFn) => Array.from({ length }, (_, index) => mapFn(index));

(async () => {
  const promises = range(5, index => {
    const ms = Math.round(Math.random() * 5000);
    return delay(ms).then(() => ({ ms, index }));
  });

  const start = Date.now();

  for await (const { ms, index } of promises) {
    console.log(`index ${index} resolved at ${ms}, consumed at ${Date.now() - start}`);
  }
})();

Поскольку вы не можете использовать асинхронные функции, вы можете имитировать эффект for await...of, объединяя в цепочку обещания, используя Array.prototype.reduce(), иСинхронное планирование обратного вызова для каждой цепочки:

const delay = ms => new Promise(resolve => { setTimeout(resolve, ms); });
const range = (length, mapFn) => Array.from({ length }, (_, index) => mapFn(index));

const asyncForEach = (array, cb) => array.reduce(
  (chain, promise, index) => chain.then(
    () => promise
  ).then(
    value => cb(value, index)
  ),
  Promise.resolve()
);

const promises = range(5, index => {
  const ms = Math.round(Math.random() * 5000);
  return delay(ms).then(() => ms);
});

const start = Date.now();

asyncForEach(promises, (ms, index) => {
  console.log(`index ${index} resolved at ${ms}, consumed at ${Date.now() - start}`);
});

Обработка ошибок

Поскольку было заявлено, что обещания создаются параллельно, я предполагаю, что ошибки в каждом отдельном обещании не будут распространяться на другие обещанияза исключением любых потенциально хрупких цепочек, построенных с помощью asyncForEach() (как выше).

Но мы также хотим избежать перекрестного распространения ошибок между обещаниями при объединении их в цепочку в asyncForEach().Вот способ надежно планировать обратные вызовы ошибок, когда ошибки могут распространяться только из исходных обещаний:

const delay = ms => new Promise(resolve => { setTimeout(resolve, ms); });
const maybe = p => p.then(v => Math.random() < 0.5 ? Promise.reject(v) : v);
const range = (length, mapFn) => Array.from({ length }, (_, index) => mapFn(index));

const asyncForEach = (array, fulfilled, rejected = () => {}) => array.reduce(
  (chain, promise, index) => {
    promise.catch(() => {}); // catch early rejection until handled below by chain
    return chain.then(
      () => promise,
      () => promise // catch rejected chain and settle with promise at index
    ).then(
      value => fulfilled(value, index),
      error => rejected(error, index)
    );
  },
  Promise.resolve()
);

const promises = range(5, index => {
  const ms = Math.round(Math.random() * 5000);
  return maybe(delay(ms).then(() => ms)); // promises can fulfill or reject
});

const start = Date.now();

const settled = state => (ms, index) => {
  console.log(`index ${index} ${state}ed at ${ms}, consumed at ${Date.now() - start}`);
};

asyncForEach(
  promises,
  settled('fulfill'),
  settled('reject') // indexed callback for rejected state
);

Единственное предостережение, которое следует здесь отметить, заключается в том, что любые ошибки, сгенерированные в обратных вызовах, переданных asyncForEach(), будут поглощены обработкой ошибок в цепочке, за исключением ошибок, возникающих вобратные вызовы последнего индекса массива.

...