kafka-node асинхронный потребительский обработчик - PullRequest
0 голосов
/ 18 января 2019

Вот как инициализируется мой потребитель:

const client = new kafka.Client(config.ZK_HOST)
const consumer = new kafka.Consumer(client, [{ topic: config.KAFKA_TOPIC, offset: 0}],
{
    autoCommit: false
})

Теперь потребитель consumer.on('message', message => applyMessage(message))

Дело в том, что applyMessage обращается к базе данных, используя knex, код выглядит примерно так:

async function applyMessage(message: kafka.Message) {
    const usersCount = await db('users').count()
    // just assume we ABSOLUTELY need to calculate a number of users,
    // so we need previous state
    await db('users').insert(inferUserFromMessage(message))
}

Приведенный выше код заставляет applyMessage выполняться параллельно для всех сообщений в kafka, поэтому в приведенном выше коде, учитывая, что в базе еще нет пользователей, usersCount ВСЕГДА будет равен 0 даже для второго сообщения от Кафка, где это должно быть 1 уже с первого вызова applyMessage вставляет пользователя.

Как мне "синхронизировать" код так, чтобы все функции applyMessage выполнялись последовательно?

1 Ответ

0 голосов
/ 23 января 2019

Вам нужно будет внедрить какой-нибудь Mutex. В основном класс, который ставит в очередь вещи для синхронного выполнения. Пример

var Mutex = function() {
  this.queue = [];
  this.locked = false;
};

Mutex.prototype.enqueue = function(task) {
  this.queue.push(task);
  if (!this.locked) {
    this.dequeue();
  }
};

Mutex.prototype.dequeue = function() {
  this.locked = true;
  const task = this.queue.shift();
  if (task) {
    this.execute(task);
  } else {
    this.locked = false;
  }
};

Mutex.prototype.execute = async function(task) {
  try { await task(); } catch (err) { }
  this.dequeue();
}

Для того, чтобы это работало, ваша applyMessage функция (в зависимости от того, что обрабатывает сообщения Kafka) должна вернуть Promise - обратите внимание, что асинхронность перешла из родительской функции в возвращенную функцию Promise:

function applyMessage(message: kafka.Message) {
  return new Promise(async function(resolve,reject) {
    try {
      const usersCount = await db('users').count()
      // just assume we ABSOLUTELY need to calculate a number of users,
      // so we need previous state
      await db('users').insert(inferUserFromMessage(message))
      resolve();
    } catch (err) {
      reject(err);
    }
  });
}

Наконец, каждый вызов applyMessage должен добавляться в очередь Mutex, а не вызываться напрямую:

var mutex = new Mutex();
consumer.on('message', message => mutex.enqueue(function() { return applyMessage(message); }))
...