Node.js: управляемый параллельный цикл while - PullRequest
1 голос
/ 22 мая 2019

У меня в коллекции mongodb 1,7 миллиона записей. Каждая запись представляет собой идентификационный номер. Мне нужно прочитать каждый идентификационный номер, сделать несколько запросов к другой службе, преобразовать данные, записать их в другую коллекцию и удалить исходную идентификационную запись, если все прошло успешно.

Мне нужен сценарий, который выполняет эти действия до бесконечности, пока коллекция не станет пустой, с настраиваемым параллелизмом (т.е. максимум 3 запроса в любое время).

Обычно я бы использовал Bluebird map, который может указывать количество одновременных обещаний, но нет входного массива (если бы я не считывал все входные записи в память, что я не собираюсь делать).

То, что я хочу, это, по сути, параллельный цикл while, т. Е. (Псевдо-javascript)

promiseWhile(queueNotEmpty, 3){
  readFromQueue
    .then(doc => {
      return process(doc);
    })
    .then(result => {
      if(result == "empty") // (or whatever)
        queueNotEmpty = false;
    });
} 

Ответы [ 2 ]

0 голосов
/ 23 мая 2019

Недавно мне пришлось сделать что-то подобное дважды.Моя проблема не включала столько записей, и мне нужно было объединить результаты в единую структуру данных, но я думаю, что это может стать началом к ​​тому, что вы ищете.

Обобщение этих решений дает мнекак-то так:

// processQueue :: ((Number -> Promise [a]), Number, (a -> b), (c, [b]) -> c), c) -> Promise c 
const processQueue = (queue, count, process, combine, init) =>
  queue (count)
    .then ( items => items .map (process) )
    .then ( promises => Promise .all (promises) )
    .then ( curr => curr .length
      ? processQueue ( queue, count, process, combine, combine(init, curr) )
      : combine (init, curr)
    )

Это принимает пять параметров:

  • queue - это функция, которая принимает число и возвращает обещание для списка значений
  • count - это число
  • process - это функция, которая преобразует одно из этих значений в другой тип
  • combine - это функция, которая объединяет целевой тип и списокэтот второй тип в целевой тип
  • init является начальным значением для редуктора

Возвращает обещание для значения этого целевого типа.

Я не могу по-настоящему продемонстрировать вашу инфраструктуру, но создать простой пример несложно.Сначала мы можем написать фиктивную функцию queue, которая возвращает обещания для групп до n элементов, пока их больше нет, а затем обещание для пустого списка.Вот глупая версия:

const queue = ((v) => (count) => Promise .resolve (
  Array .from ( {length: Math .min (count, 10 - v + 1) }, () => ( { id: v++ } ))
)) (1)
 
queue (3) .then (console.log) //~> [{id: 1}, {id: 2}, {id: 3}]
queue (3) .then (console.log) //~> [{id: 4}, {id: 5}, {id: 6}]
queue (3) .then (console.log) //~> [{id: 7}, {id: 8}, {id: 9}]
queue (3) .then (console.log) //~> [{id: 10}]
queue (3) .then (console.log) //~> [] // (and will forevermore return the empty list)

Затем мы можем объединить его с функцией, которая обрабатывает один элемент, редуктором, который просто объединяет массивы, и пустым массивом для начала, чтобы получить что-то вроде этого:

const processQueue = (queue, count, process, combine, init) =>
  queue (count)
    .then ( items => items .map (process) )
    .then ( promises => Promise .all (promises) )
    .then ( curr => curr .length
      ? processQueue ( queue, count, process, combine, combine(init, curr) )
      : combine (init, curr)
    )

const queue = ((v) => (count) => Promise.resolve (
  Array .from ( {length: Math .min (count, 10 - v + 1) }, () => ( { id: v++ } ))
)) (1)


processQueue(
  queue, 
  3, 
  ( {id} ) => ( {square: id * id} ),
  (a, b) => a .concat (b),
  []
) .then (console.log)
//~> [{square: 1}, {square: 4}, {square: 9}, ..., {square: 100}]

Хотя на первый взгляд кажется, что могут возникнуть проблемы с глубиной рекурсии, мы оставляем наши текущие кадры стека в каждом .then(...).Вы можете увидеть, что это все еще работает, если вы замените 10 в queue на 100000.(У меня не хватило терпения на миллион!)

В вашем случае, если вам не нужно ничего возвращать из вашей функции обработки, и, следовательно, вам не нужно делать какие-либо объединения в редукторфункция, это может быть немного упрощено.Но если вам нужно что-то там делать, даже просто сообщать о количестве успешных преобразований в сравнении с ошибками, тогда эта полная версия может быть уместной.


Теперь я собираюсь исправить свой недавний код, чтобы использовать эту абстракцию...

0 голосов
/ 22 мая 2019

Вы можете использовать курсор mongodb для асинхронной итерации по всем записям. Для того, чтобы над ней работали три рабочих, оберните задачу в асинхронную функцию и вызовите ее несколько раз:

 const cursor = db.collection("records").find({});

 async function process() {
   while(await cursor.hasNext()) {
     const record = await cursor.next();
     //...
   }
 }

 await Promise.all([ process(), process(), process() ]);

(я не уверен, поддерживает ли драйвер mongodb одновременные вызовы на .next(), вы должны это проверить)


В противном случае реализация этого семафора может помочь:

 function Semaphore(count = 1) {
  const resolvers = [];
  let startCount = count;

   return {
     aquire() {
       return new Promise(resolve => {
         if(startCount) { resolve(); startCount -= 1; }
         else resolvers.push(resolve);
       });
     },
     free() { 
       if(resolvers.length) resolvers.pop()(); 
       else startCount += 1;
     },
     async use(cb) { 
       await this.aquire(); 
       await cb(); 
       this.free() 
     },
     async done() {
       await Promise.all(Array.from({ length: count }, () => this.aquire()));
       startCount = count;
     },
   };
 }

Демонстрация работы В вашем случае это можно использовать как:

 const connectionSemaphore = Semaphore(3);

 (async fuction() {
    while(await cursor.hasNext()) {
      const record = await cursor.next();
      /*await*/ connectionSemaphore.use(async () => {
        // Do connection stuff concurrently
      });
    }

    await connectionSemaphore.done();
 })();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...