Я пытаюсь кодировать простого потребителя, используя librabbitmq.Это работает, но когда я выполняю amqp_basic_consume, он потребляет всю очередь.Я хочу, чтобы оно получило одно сообщение, обработало его и повторило.
Я пытался использовать basic_qos для предварительной выборки потребителем 1 за один раз, но, похоже, это никак не отразилось.
Базовая настройка и цикл: // устанавливаем qos 1 сообщения за раз, если (! Amqp_basic_qos (conn, channel, 0, 1, 0)) {die_on_amqp_error (amqp_get_rpc_reply (conn), "basic.qos");}
// Consuming the message
amqp_basic_consume(conn, channel, queue, amqp_empty_bytes, no_local, no_ack, exclusive, amqp_empty_table);
while (run) {
amqp_rpc_reply_t result;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
result = amqp_consume_message(conn, &envelope, &timeout, 0);
if (AMQP_RESPONSE_NORMAL == result.reply_type) {
strncpy(message, envelope.message.body.bytes, envelope.message.body.len);
message[envelope.message.body.len] = '\0';
printf("Received message size: %d\nbody: -%s-\n", (int) envelope.message.body.len, message );
if ( strncmp(message, "DONE",4 ) == 0 )
{
printf("XXXXXXXXXXXXXXXXXX Cease message received. XXXXXXXXXXXXXXXXXXXXX\n");
run = 0;
}
amqp_destroy_envelope(&envelope);
}else{
printf("Timeout.\n");
run = 0;
}
}
Я ожидаю, что очередь заполнена, и я могу начать обработку, и если я нажму ^ C, остальные сообщения все еще находятся в очереди.Вместо этого, даже если я обработал только одно сообщение, вся очередь очищается.