Завершение соединения с RabbitMQ - PullRequest
0 голосов
/ 04 июня 2018

Я создал простое консольное приложение для прослушивания сообщений на Rabbit MQ.Работает отлично.Там нет проблем.Но как мне закрыть соединение.Я много гуглял вокруг, но не нашел четких ответов.Наиболее близкий ответ на этот вопрос - это вопрос SO: Каков наилучший способ безопасного завершения Java-приложения с использованием потребителей RabbitMQ , но в ответе пропущена самая важная часть: код!

Вот мой код:

package com.company;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;

interface ObjectRecievedListener {
    void objectRecieved(Object obj);
}

public class RabbitMQReceiver extends RabbitMQBase
{

    ArrayList<DefaultConsumer> consumers = new ArrayList<>();

    private List<ObjectRecievedListener> listeners = new ArrayList<>();
    private final Connection connection;
    private final Channel channel;

    public RabbitMQReceiver(RabbitMQProperties properties) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(properties.getHost());
        factory.setPassword(properties.getPassword());
        factory.setUsername(properties.getLogin());

        connection = factory.newConnection();
        channel = connection.createChannel();
        channel.queueDeclare(properties.getInboundQueueName(), true, false, false, null);
        channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");


                try {
                    doWork(message);
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        consumers.add((DefaultConsumer) consumer);

        boolean autoAck = false;
        channel.basicConsume(properties.getInboundQueueName(), autoAck, consumer);
    }

    public void addListener(ObjectRecievedListener listener) {
        listeners.add(listener);
    }

    public void stop() {
        try {
            channel.close();
            connection.close();
        } catch (Exception e) {
        }
    }

    private void doWork(String message) {
        Object obj = getObjectFromXML(message);

        for (ObjectRecievedListener l : listeners)
            l.objectRecieved(obj);
        stop();
    }
}

Но это не останавливается только потому, что я позвонил stop()

Короче говоря: Как мне закрыть соединение сКролик MQ?

Ответы [ 2 ]

0 голосов
/ 05 июня 2018

Команда RabbitMQ отслеживает список рассылки rabbitmq-users и только иногда отвечает на вопросы по StackOverflow.


Когда вы говорите «это не останавливается», вы подтвердили, чтометод stop() на самом деле вызывается?Ваш код висит на одном из close() методов?

Экземпляр channel будет иметь basicCancel метод , который можно вызвать для отмены текущего потребителя перед закрытием канала.и связь.Если честно, закрытие канала отменит вашего потребителя, поэтому я сомневаюсь, что это является основной причиной проблемы.

0 голосов
/ 04 июня 2018

Попробуйте это:

let connection = null;

    connection = await amqp.connect(`amqp://${config.username}:${config.password}@${config.host}:${config.port}`);

    connection.close();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...