Обещание закрытия в цикле - PullRequest
0 голосов
/ 23 июня 2018

Я получаю строки данных каждую секунду от Кафки. Для каждой партии данных я вставляю в свою базу данных.

Мое приложение продолжает читать последние message и id каждой партии. Проблема заключается в том, что обещания не выполняются последовательно, а выполняются одновременно после завершения одного пакета, и они продолжают читать одинаковые message и id. Я хочу, чтобы каждое обещание имело свои собственные message и id, как определено порядком, в котором они поступили из цикла for в первой функции.

Я думаю, что мне нужно использовать замыкания, однако я не уверен, как их применить здесь. Я не хочу использовать таймеры!

Спасибо!

// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
batchOfRows.on('message', function (data) {
    for (var i = 0; i < batchOfRows.rows.length; i++) {
        validate(batchOfRows.rows[i])
            .then(result => console.log(result))
            .catch(error => console.log(error));
    }
});

// For each row received, give it an ID and then insert into the DB
function validate(data) {
    return new Promise((resolve, reject) => {
        message = data;
        id = message.date + message.location
        DB.execute('select * from table1 where id = ?', id) // This is a promise function provided by the database driver (Cassandra)
            .then(result => {
                // Insert into the table at this ID
                insertIntoDB(message, id)
                    .then(result => resolve(result))
                    .catch(error => reject(error));
            })
            .catch(error => {
                reject(error);
            });
    });
}

// Inserting into DB
function insertIntoDB(message, id) {
    return new Promise((resolve, reject) => {
        query = "insert into table2 where id = ? and messageBody = ?";

        DB.execute(query, [id, JSON.Stringify(message)])
            .then(result => resolve("Successfully inserted message ID " + id))
            .catch(error => reject("Error inserting!"));
    });
}

РЕДАКТИРОВАТЬ (решение Данх):

var kafka = require('kafka-node');
client = new kafka.Client("localhost:2181"), Consumer = kafka.Consumer;
// This is like an event listener.
batchOfRows = new Consumer(
    client, [{
        topic: 'my_topic',
        partition: 0,
        offset: 0
    }], {
        fromOffset: false
    }
);

let results = [];
let promises = Promise.resolve();

function processQueue() {
    queue.forEach(element => {
        promises = promises.then(element.map(processElement)).then(elementResult => {
            // results.push(elementResult); // Don't want result to increase in size! I have put this inside insertDB then I clear it below
            console.log(results.length); // First received batch prints: 0. Second received batch prints 72. Third received batch prints 75
            results = [];  
            queue.shift();
        });
    });
}

batchOfRows.on('message', function (data) {
    console.log(batchOfRows.value.length); // First received batch prints: 72. Second received batch prints 75. Third received batch prints 76
    queue.push(batchOfRows.rows);
    processQueue();
});

function processElement(data) {
    const id = data.date + data.location
    return  DB.execute('select * from table1 where id = ?', id)
              .then(result => insertIntoDB(data, id).then(() => result));
}

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)])
        .then(result => {
            // Pushing the result here
            results.push(result); // Seems like it does not push the results from the first batch from batchOfRows until it receives the second batch
            console.log("Test") // On the first batch prints "Test" 72 times right away
        });
}

EDIT Я слегка изменил функцию processQueue, добавив element.map (processUpdate), потому что пакеты, полученные из batchOfRows, фактически являются массивами, и мне нужно выполнить этот запрос БД для каждого элемента в этом массиве.

Я также удалил results.push (elementResult), потому что elementResult на самом деле по какой-то причине не определен. Я переместил results.push (elementResult) в insertIntoDB и назвал его result.push (result). Это может быть причиной возникновения ошибки (я не знаю, как вернуть результат из insertIntoDB обратно в вызывающую функцию обещания processQueue).

Если вы взгляните на insertIntoDB, если я console.log ("test"), он напечатает test столько же раз, сколько элементов в массиве batchOfRows, что означает, что он разрешил все обещания в этом пакете. Таким образом, в первом пакете / сообщении, если есть 72 строки, будет напечатано «Тест» 72 раза. Но если я изменю этот console.log («Test») на просто results.push (result) или даже results.push («test»), а затем напечатаю result.length, он все равно даст мне 0, пока второй пакет не завершится хотя я ожидаю, что длина будет 72.

Ответы [ 2 ]

0 голосов
/ 23 июня 2018

Может быть полезно немного абстрагировать идеи и представить их явно в данных (а не в данных, которые косвенно сохраняются в обещаниях).Начните с очереди:

let queue = [];

Добавьте материал в очередь с помощью queue.push(element) и получите и удалите в порядке прибытия с element = queue.shift()

Наша цель - обработать все, что находится в очереди, в порядке, сохраняя результаты в порядке.Сама обработка асинхронная, и мы хотим завершить один элемент очереди перед началом следующего, поэтому нам нужна цепочка обещаний (называемая promises) для обработки очереди:

let results = [];
let promises = Promise.resolve();

function processQueue() {
    queue.forEach(element => {
        promises = promises.then(processElement(element)).then(elementResult => {
            results.push(elementResult);
            queue.shift();
        });
    });
}

Мы можем убедить себячто это правильно, даже не задумываясь о том, что делает processElement(), пока это возвращает обещание.(В случае OP это обещание является обещанием иметь дело с массивом «строк»).processElement() все сделает, и результат (массив результатов в случае OP) будет переведен в results.

Уверен, что порядок операций имеет смысл, когда приходит новый пакет,добавьте его в очередь, а затем обработайте все, что находится в очереди:

batchOfRows.on('message', function (data) {
    queue.push(batchOfRows.rows);
    processQueue();
});

Нам просто нужно определить processElement().Воспользуйтесь полезными советами @ YuryTarabanko для этого здесь (и оставьте его ответ помеченным как правильный, IMO)

function processElement(data) {
    const id = data.date + data.location
    return  DB.execute('select * from table1 where id = ?', id)
              .then(result => insertIntoDB(data, id).then(() => result));
}

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)])
}

Одним из приятных побочных эффектов этого является то, что вы можете измерять прогресс.Если входные данные поступают слишком быстро, то выражение:

queue.length - results.length

... будет расти со временем.

EDIT Глядя на новый код, я озадаченпочему запрос выполняется для каждой строки (каждый элемент в batchOfRows.rows).Поскольку результат этого запроса игнорируется, не делайте этого ...

function processElement(data) {
    const id = data.date + data.location
    // we know everything we need to know to call insert (data and id)
    // just call it and return what it returns :-)
    return insertIntoDB(data, id);
}

Теперь я понимаю, что это будет длительная задача, и она не должна накапливать результаты (даже линейно),Более чистое решение для этого - удалить все ссылки на массив results, который я предложил.Минимальная версия insert просто вставляет и возвращает результат вставки ...

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)]);
}

Я думаю, что вы добавили некоторый код для регистрации результатов (лучший тест, который сработал бы, это проверить базу данных через некоторыевне процесса, но если вы хотите войти, просто запомните pass-through значение результата после регистрации.

anyPromise.then(result => {
    console.log(result);
    return result;  // IMPORTANT
})
0 голосов
/ 23 июня 2018

В вашем коде есть различные антипаттерны. Во-первых, вам не нужно вручную создавать обещание, скорее всего, вам никогда не придется звонить new Promise. Во-вторых, вы нарушаете цепочку обещаний, не возвращая вложенное обещание из обработчика onFulfill. И, наконец, вы загрязняете глобальную область видимости, когда не объявляете переменные id = message.date + message.location

// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
let pending = Promise.resolve([]); // previous batch starting w/ resolved promise
batchOfRows.on('message', function (data) {
    // not sure where was batchRows comming from in your code
    const nextBatch = () => Promise.all(
      data.batchOfRows.rows.map(validate)
    );

    // reassign pending to a new promise
    // whatever happend to previous promise we keep running
    pending = pending
      .then(nextBatch)
      .catch(e => console.error(e))
});

// For each row received, give it an ID and then insert into the DB
function validate(data) {
    const id = data.date + data.location
    return  DB.execute('select * from table1 where id = ?', id)
              .then(result => insertIntoDB(data, id).then(() => result));
}

// Inserting into DB
function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)])
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...