У RabbitMQ есть эксклюзивное сообщение о потреблении потребителем серийно - PullRequest
0 голосов
/ 04 декабря 2018

У меня есть сценарий, в котором по заданной теме мне нужно поочередно использовать каждое сообщение, выполнять некоторую асинхронную задачу, а затем использовать следующую.Я использую rabbitmq и amqp.node .

Я смог добиться этого с prefetch из 1. Что, конечно, нефактическое решение, так как это заблокировало бы весь канал, и канал имеет несколько тем.

Пока это мой продюсер:

const getChannel = require("./getChannel");
async function run() {
    const exchangeName = "taskPOC";
    const url = "amqp://queue";
    const channel = await getChannel({ url, exchangeName });
    const topic = "task.init";
    let { queue } = await channel.assertQueue(topic, {
        durable: true
    });
    const max = 10;
    let current = 0;
    const intervalId = setInterval(() => {
        current++;
        if (current === max) {
        clearInterval(intervalId);
        return;
        }
        const payload = JSON.stringify({
        foo: "bar",
        current
        });

        channel.sendToQueue(queue, Buffer.from(payload), { persistent: true });
    }, 3000);

}
run()
.then(() => {
    console.log("Running");
})
.catch(err => {
    console.log("error ", err);
});

А это мой потребитель

const getChannel = require("./getChannel");
async function run() {
    const exchangeName = "taskPOC";
    const url = "amqp://queue";
    const channel = await getChannel({ url, exchangeName });
    channel.prefetch(1);
    const topic = "task.init";
    const { queue } = await channel.assertQueue(topic, {
        durable: true
    });
    channel.bindQueue(queue, exchangeName, topic);
    let last = new Date().getTime();
    channel.consume(
        queue,
        msg => {
        const now = new Date().getTime();
        console.log(
            " [x] %s %s:'%s' ",
            msg.fields.routingKey,
            Math.floor((now - last) / 1000),
            msg.content.toString()
        );
        last = now;
        setTimeout(function() {
            channel.ack(msg);
        }, 10000);
        },
        { exclusive: true, noAck: false }
    );
}
run()
.then(() => {
    console.log("Running");
})
.catch(err => {
    console.log("error ", err);
});

Есть ли на RabbitMQ какой-либо способ сделать это, или мне нужно будет обработать это в моем приложении?

Спасибо.

1 Ответ

0 голосов
/ 18 декабря 2018

Вы можете использовать настройку предварительной выборки потребителя (см. https://www.rabbitmq.com/consumer-prefetch.html).. В случае узла amqp вы можете установить эту опцию, используя функцию предварительной выборки:

channel.prefetch(1, false); // global=false

Inв этом случае каждый потребитель на канале будет иметь предварительную выборку 1. Если вы хотите иметь разные конфигурации для каждого потребителя, вы должны создать больше каналов.

Надеюсь, это поможет.

...