У меня есть сценарий, в котором по заданной теме мне нужно поочередно использовать каждое сообщение, выполнять некоторую асинхронную задачу, а затем использовать следующую.Я использую rabbitmq и amqp.node .
Я смог добиться этого с prefetch из 1. Что, конечно, нефактическое решение, так как это заблокировало бы весь канал, и канал имеет несколько тем.
Пока это мой продюсер:
const getChannel = require("./getChannel");
async function run() {
const exchangeName = "taskPOC";
const url = "amqp://queue";
const channel = await getChannel({ url, exchangeName });
const topic = "task.init";
let { queue } = await channel.assertQueue(topic, {
durable: true
});
const max = 10;
let current = 0;
const intervalId = setInterval(() => {
current++;
if (current === max) {
clearInterval(intervalId);
return;
}
const payload = JSON.stringify({
foo: "bar",
current
});
channel.sendToQueue(queue, Buffer.from(payload), { persistent: true });
}, 3000);
}
run()
.then(() => {
console.log("Running");
})
.catch(err => {
console.log("error ", err);
});
А это мой потребитель
const getChannel = require("./getChannel");
async function run() {
const exchangeName = "taskPOC";
const url = "amqp://queue";
const channel = await getChannel({ url, exchangeName });
channel.prefetch(1);
const topic = "task.init";
const { queue } = await channel.assertQueue(topic, {
durable: true
});
channel.bindQueue(queue, exchangeName, topic);
let last = new Date().getTime();
channel.consume(
queue,
msg => {
const now = new Date().getTime();
console.log(
" [x] %s %s:'%s' ",
msg.fields.routingKey,
Math.floor((now - last) / 1000),
msg.content.toString()
);
last = now;
setTimeout(function() {
channel.ack(msg);
}, 10000);
},
{ exclusive: true, noAck: false }
);
}
run()
.then(() => {
console.log("Running");
})
.catch(err => {
console.log("error ", err);
});
Есть ли на RabbitMQ какой-либо способ сделать это, или мне нужно будет обработать это в моем приложении?
Спасибо.