У меня есть простой кусок кода, который читает в файле JSON, используя fs
.Я использую функцию обратного вызова, и после того, как у меня есть данные, я использую их для настройки другой части моего приложения.
Если я не использую тайм-аут или я помещаю обработчик событий в свой fs
обратный вызов вызовет ошибку.
/**
*
* consumer.js
*
* Subscribes to a kafka topic to consume
* messages sent by our producer.
*
*/
// Defined constants
const kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.KafkaClient(),
utilities = require('./utilities'),
fs = require('fs');
// Defined vars
let consumer = {};
// Load our topics to subscribe to
fs.readFile('./json/topics.json', readTopicFile);
/**
* Handle our file load
* @param {*} err
* @param {*} data
*/
function readTopicFile(err, data) {
// Handle file read error
if (err) {
utilities.logError('Error reading topic file', err);
return;
}
// Define our topic data
let obj = JSON.parse(data);
// Define our consumer
consumer = new Consumer(
client,
obj,
{
autoCommit: false,
fromOffset: 'latest'
}
);
// Start out consumer
setTimeout(function () {
startConsumer();
}, 100);
}
/**
* Wait for messages from our subscribed topics
*/
function startConsumer() {
console.log('consumer waiting for data ...')
consumer.on('message', function (message) {
utilities.storeRecord(message); // Error in this utility function if timeout isn't in place
});
}
Для контекста, вот мой файл утилит:
storeRecord: function (payload) {
// Define our postgres instance
var pg = require('pg');
var utilities = require('./utilities');
// Connect to the postgres instance
pg.connect(process.env.DATABASE_URL, function (err, conn, done) {
// watch for any connect issues
if (err) {
logError('Unable to connect to postgres', err);
}
// Check to see if we got our payload
if (payload) {
// Holds our payload data
const val = JSON.parse(payload.value).data; // <<<--- Without the setTimeout on the consumer.js, this throws the error.
// Insert the data
const sql = 'INSERT INTO communications (firstname, lastname, age, department, campus, state) VALUES ($1, $2, $3, $4, $5, $6)';
const values = [val.firstName, val.lastName, val.age, val.department, val.campus, val.state];
conn.query(sql, values, (err, result) => {
done();
if (err) {
utilities.logError('Error inserting data into table', err);
} else {
utilities.logError('Inserted data', result);
}
});
}
});
}
В приведенном выше коде мой utilities.storeRecord
выдает ошибку, где этоожидая получить данные от message
.
Мой вопрос касается того, почему использование setTimeout
устраняет проблему синхронизации.Ошибка, которую выдает мой файл утилит: TypeError: Cannot read property 'data' of null
.
Судя по тому, что я могу сказать, это почти так, как если бы new Consumer
не успел инициализироваться и не знает, что использует fromOffset: 'latest'
,Я считаю, что это так, поскольку message
не должно содержать ничего в событии consumer.on('message', ...
.
Если я регистрирую message
С таймаутом, данные не передаются.Если я регистрирую message
БЕЗ тайм-аута, он перечисляет все сообщения в начале темы (потому что он не знает, что мы хотим только самую последнюю версию).
Я застрял с этим таймаутом илиЕсть ли лучший способ инициализировать Consumer
, чтобы событие узнало об этом раньше?