Может быть полезно немного абстрагировать идеи и представить их явно в данных (а не в данных, которые косвенно сохраняются в обещаниях).Начните с очереди:
let queue = [];
Добавьте материал в очередь с помощью queue.push(element)
и получите и удалите в порядке прибытия с element = queue.shift()
Наша цель - обработать все, что находится в очереди, в порядке, сохраняя результаты в порядке.Сама обработка асинхронная, и мы хотим завершить один элемент очереди перед началом следующего, поэтому нам нужна цепочка обещаний (называемая promises
) для обработки очереди:
let results = [];
let promises = Promise.resolve();
function processQueue() {
queue.forEach(element => {
promises = promises.then(processElement(element)).then(elementResult => {
results.push(elementResult);
queue.shift();
});
});
}
Мы можем убедить себячто это правильно, даже не задумываясь о том, что делает processElement()
, пока это возвращает обещание.(В случае OP это обещание является обещанием иметь дело с массивом «строк»).processElement()
все сделает, и результат (массив результатов в случае OP) будет переведен в results
.
Уверен, что порядок операций имеет смысл, когда приходит новый пакет,добавьте его в очередь, а затем обработайте все, что находится в очереди:
batchOfRows.on('message', function (data) {
queue.push(batchOfRows.rows);
processQueue();
});
Нам просто нужно определить processElement()
.Воспользуйтесь полезными советами @ YuryTarabanko для этого здесь (и оставьте его ответ помеченным как правильный, IMO)
function processElement(data) {
const id = data.date + data.location
return DB.execute('select * from table1 where id = ?', id)
.then(result => insertIntoDB(data, id).then(() => result));
}
function insertIntoDB(message, id) {
const query = "insert into table2 where id = ? and messageBody = ?";
return DB.execute(query, [id, JSON.Stringify(message)])
}
Одним из приятных побочных эффектов этого является то, что вы можете измерять прогресс.Если входные данные поступают слишком быстро, то выражение:
queue.length - results.length
... будет расти со временем.
EDIT Глядя на новый код, я озадаченпочему запрос выполняется для каждой строки (каждый элемент в batchOfRows.rows
).Поскольку результат этого запроса игнорируется, не делайте этого ...
function processElement(data) {
const id = data.date + data.location
// we know everything we need to know to call insert (data and id)
// just call it and return what it returns :-)
return insertIntoDB(data, id);
}
Теперь я понимаю, что это будет длительная задача, и она не должна накапливать результаты (даже линейно),Более чистое решение для этого - удалить все ссылки на массив results
, который я предложил.Минимальная версия insert просто вставляет и возвращает результат вставки ...
function insertIntoDB(message, id) {
const query = "insert into table2 where id = ? and messageBody = ?";
return DB.execute(query, [id, JSON.Stringify(message)]);
}
Я думаю, что вы добавили некоторый код для регистрации результатов (лучший тест, который сработал бы, это проверить базу данных через некоторыевне процесса, но если вы хотите войти, просто запомните pass-through значение результата после регистрации.
anyPromise.then(result => {
console.log(result);
return result; // IMPORTANT
})