Вот как инициализируется мой потребитель:
const client = new kafka.Client(config.ZK_HOST)
const consumer = new kafka.Consumer(client, [{ topic: config.KAFKA_TOPIC, offset: 0}],
{
autoCommit: false
})
Теперь потребитель consumer.on('message', message => applyMessage(message))
Дело в том, что applyMessage
обращается к базе данных, используя knex
, код выглядит примерно так:
async function applyMessage(message: kafka.Message) {
const usersCount = await db('users').count()
// just assume we ABSOLUTELY need to calculate a number of users,
// so we need previous state
await db('users').insert(inferUserFromMessage(message))
}
Приведенный выше код заставляет applyMessage
выполняться параллельно для всех сообщений в kafka, поэтому в приведенном выше коде, учитывая, что в базе еще нет пользователей, usersCount
ВСЕГДА будет равен 0 даже для второго сообщения от Кафка, где это должно быть 1 уже с первого вызова applyMessage
вставляет пользователя.
Как мне "синхронизировать" код так, чтобы все функции applyMessage
выполнялись последовательно?