Инициализация NodeJS Kafka вызывает проблемы с синхронизацией - PullRequest
0 голосов
/ 09 июля 2019

У меня есть простой кусок кода, который читает в файле JSON, используя fs.Я использую функцию обратного вызова, и после того, как у меня есть данные, я использую их для настройки другой части моего приложения.

Если я не использую тайм-аут или я помещаю обработчик событий в свой fsобратный вызов вызовет ошибку.

/**
 * 
 * consumer.js 
 * 
 * Subscribes to a kafka topic to consume
 * messages sent by our producer.
 * 
 */

// Defined constants
const kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.KafkaClient(),
    utilities = require('./utilities'),
    fs = require('fs');

// Defined vars
let consumer = {};

// Load our topics to subscribe to
fs.readFile('./json/topics.json', readTopicFile);

/**
 * Handle our file load
 * @param {*} err 
 * @param {*} data 
 */
function readTopicFile(err, data) {

    // Handle file read error
    if (err) {
        utilities.logError('Error reading topic file', err);
        return;
    }

    // Define our topic data
    let obj = JSON.parse(data);

    // Define our consumer
    consumer = new Consumer(
        client,
        obj,
        {
            autoCommit: false,
            fromOffset: 'latest'
        }
    );

    // Start out consumer
    setTimeout(function () {
        startConsumer();
    }, 100);

}


/**
 * Wait for messages from our subscribed topics
 */
function startConsumer() {
    console.log('consumer waiting for data ...')
    consumer.on('message', function (message) {
        utilities.storeRecord(message); // Error in this utility function if timeout isn't in place
    });
}

Для контекста, вот мой файл утилит:

storeRecord: function (payload) {

        // Define our postgres instance
        var pg = require('pg');
        var utilities = require('./utilities');

        // Connect to the postgres instance
        pg.connect(process.env.DATABASE_URL, function (err, conn, done) {

            // watch for any connect issues
            if (err) {
                logError('Unable to connect to postgres', err);
            }

            // Check to see if we got our payload
            if (payload) {

                // Holds our payload data
                const val = JSON.parse(payload.value).data; // <<<--- Without the setTimeout on the consumer.js, this throws the error.

                // Insert the data
                const sql = 'INSERT INTO communications (firstname, lastname, age, department, campus, state) VALUES ($1, $2, $3, $4, $5, $6)';
                const values = [val.firstName, val.lastName, val.age, val.department, val.campus, val.state];

                conn.query(sql, values, (err, result) => {
                    done();
                    if (err) {
                        utilities.logError('Error inserting data into table', err);
                    } else {
                        utilities.logError('Inserted data', result);
                    }
                });

            }
        });
    }

В приведенном выше коде мой utilities.storeRecord выдает ошибку, где этоожидая получить данные от message.

Мой вопрос касается того, почему использование setTimeout устраняет проблему синхронизации.Ошибка, которую выдает мой файл утилит: TypeError: Cannot read property 'data' of null.

Судя по тому, что я могу сказать, это почти так, как если бы new Consumer не успел инициализироваться и не знает, что использует fromOffset: 'latest',Я считаю, что это так, поскольку message не должно содержать ничего в событии consumer.on('message', ....

Если я регистрирую message С таймаутом, данные не передаются.Если я регистрирую message БЕЗ тайм-аута, он перечисляет все сообщения в начале темы (потому что он не знает, что мы хотим только самую последнюю версию).

Я застрял с этим таймаутом илиЕсть ли лучший способ инициализировать Consumer, чтобы событие узнало об этом раньше?

...