Я пишу Node.js приложение, которое читает сообщения в кодировке Avro из очень загруженных тем кафки (миллионы сообщений в день), распаковывает их, декодирует их и записывает их в базу данных PostgreSQL. Проблема в том, что программа запускается в течение минуты или двух, а затем вылетает на JavaScript куча из-за ошибки памяти. Я исследовал, где может быть проблема, и кажется, что это происходит, когда я пишу декодированное сообщение в базу данных. Если я просто регистрирую декодированные сообщения, все работает нормально. Или, если я пишу сообщения в базу данных, не расшифровывая их, это работает нормально. Но когда я вызываю свою функцию writeRow () для события decoder.on ('data'), у нее очень скоро заканчивается память. Не правильно ли я обрабатываю события, генерируемые декодером?
Я использую следующие модули:
- kafka js, kafka js -snappy, avs c для прием, распаковка и декодирование сообщений из тем kafka
- node- postgres для записи сообщений в базу данных
Вот соответствующий код:
Потребитель:
const run = async () => {
await consumer.connect();
await topics.forEach(topic => {
consumer.subscribe({ topic: topic, fromBeginning: false });
});
await consumer.run({
eachMessage: async ({ topic, message }) => {
try {
const buf = await Buffer.from(message.value, "binary");
const decoder = new avro.streams.BlockDecoder();
await decoder.on("data", async msg => {
console.log(`Message received: ${msg}`);
await writeRow(topic, new Date().toISOString(), msg);
});
await decoder.end(buf);
} catch (err) {
console.log(err);
}
}
});
};
Функция writeRow ():
const writeRow = async (topic, timestamp, data) => {
const client = await pool.connect();
try {
const res = await client.query(
`INSERT INTO ${topic.replace(
/\./g,
"_"
)}(topic, timestamp, data) VALUES('${topic}', '${timestamp}', '${JSON.stringify(
data
)}')`
);
console.log(`${res.rowCount} row written to database.`);
} catch (err) {
console.log(err);
} finally {
await client.release();
}
};