JavaScript куча памяти при вызове асинхронной функции c для события - PullRequest
0 голосов
/ 05 февраля 2020

Я пишу Node.js приложение, которое читает сообщения в кодировке Avro из очень загруженных тем кафки (миллионы сообщений в день), распаковывает их, декодирует их и записывает их в базу данных PostgreSQL. Проблема в том, что программа запускается в течение минуты или двух, а затем вылетает на JavaScript куча из-за ошибки памяти. Я исследовал, где может быть проблема, и кажется, что это происходит, когда я пишу декодированное сообщение в базу данных. Если я просто регистрирую декодированные сообщения, все работает нормально. Или, если я пишу сообщения в базу данных, не расшифровывая их, это работает нормально. Но когда я вызываю свою функцию writeRow () для события decoder.on ('data'), у нее очень скоро заканчивается память. Не правильно ли я обрабатываю события, генерируемые декодером?

Я использую следующие модули:

  • kafka js, kafka js -snappy, avs c для прием, распаковка и декодирование сообщений из тем kafka
  • node- postgres для записи сообщений в базу данных

Вот соответствующий код:

Потребитель:

const run = async () => {
  await consumer.connect();
  await topics.forEach(topic => {
    consumer.subscribe({ topic: topic, fromBeginning: false });
  });

  await consumer.run({
    eachMessage: async ({ topic, message }) => {
      try {
        const buf = await Buffer.from(message.value, "binary");
        const decoder = new avro.streams.BlockDecoder();
        await decoder.on("data", async msg => {
          console.log(`Message received: ${msg}`);
          await writeRow(topic, new Date().toISOString(), msg);
        });
        await decoder.end(buf);
      } catch (err) {
        console.log(err);
      }
    }
  });
};

Функция writeRow ():

const writeRow = async (topic, timestamp, data) => {
  const client = await pool.connect();
  try {
    const res = await client.query(
      `INSERT INTO ${topic.replace(
        /\./g,
        "_"
      )}(topic, timestamp, data) VALUES('${topic}', '${timestamp}', '${JSON.stringify(
        data
      )}')`
    );
    console.log(`${res.rowCount} row written to database.`);
  } catch (err) {
    console.log(err);
  } finally {
    await client.release();
  }
};
...