ActiveMQ долго работает python потребительских процессов застрял - PullRequest
0 голосов
/ 10 марта 2020

У меня есть потребитель ActiveMQ, записанный в библиотеке Python stomp.py. Это долгосрочные потребители (например, 1 неделя). Мы отключили сердцебиение (0,0). Через некоторое время (например, 20 часов) потребители застревают, когда выбирают сообщение. Мы написали потребителю механизм переподключения, который будет переподключаться после сбоя соединения, но потребители все равно застряли.

Logi c:

  1. try для подключения
  2. is_connected () #true или false
  3. , если false -> добрался до шага 1

Подробности среды:

  1. Python 3.7
  2. stomp.py 4.1.22
  3. ActiveMQ 5.15.8 [настройка сети брокеров с активным / резервным {главным / подчиненным}]]

Python код:

import ssl
import stomp
LOG = custom_logger.get_logger("core")

class ActiveMQConsumer(ConsumerInterface):

    def __init__(self, service_conf):
        """
        Initialize the ActiveMQConsumer class that call create connection and adds listener.
        Args:
            service_conf : ActiveMQ configuration dict.
        Returns:
            None
        Raises:
            ConnectFailedException : Error while unable to connect to ActiveMQ
            Exception : Base exception to catch all other exception
        """
        try:
            LOG.verbose("Entry")
            self.username = service_conf["username"]
            self.password = service_conf["password"]
            self.hosts = service_conf["hosts"]
            self.ports = service_conf["ports"]
            self.callback = None
            self.subscription_details = {}
            self.heartbeats = service_conf.get("heartbeats", (0, 0))
            self.heart_beat_receive_scale = service_conf.get("heart_beat_receive_scale", 2.0)
            self.listener_obj = ConsumerListener(self)
            self.create_connection()

        except stomp.exception.ConnectFailedException as stomp_ex:
            raise stomp_ex
        except Exception as ex:
            raise ex

    def create_connection(self):
        """
        create connection with activemq brokers
        Args:
            None
        Returns:
            None
        Raises:
            Exception : Base exception to catch all exception
        """
        LOG.verbose("Entry")
        exclude_hosts = []
        for retry_count in range(MAX_CONNECTION_ATTEMPTS):
            try:
                selected_host, selected_port = select_host(self.hosts, self.ports,
                                                           exclude_hosts)
                conn_param = zip(selected_host, selected_port)
                self.conn = stomp.Connection11(conn_param, heartbeats=self.heartbeats,
                                               encoding=ENCODE_FORMAT,
                                               heart_beat_receive_scale=\
                                               self.heart_beat_receive_scale,
                                               reconnect_attempts_max=MAX_STOMP_RECONNECT_ATTEMPTS,
                                               reconnect_sleep_increase=RECONNECT_SLEEP_INCREASE)
                self.conn.set_ssl(for_hosts=list(zip(selected_host, selected_port)),
                                  ssl_version=ssl.PROTOCOL_TLS)
                self.conn.set_listener('connection_listener', self.listener_obj)
                self.conn.start()
                self.conn.connect(self.username, self.password, wait=True)
                LOG.info("Connected to activemq host :: "+str(selected_host))
                LOG.verbose("Exit")
                break
            except stomp.exception.ConnectFailedException as stomp_ex:
                LOG.info("retry fail count::"+str(retry_count))
                exclude_hosts.extend(selected_host)
                if len(exclude_hosts) == len(self.hosts):
                    exclude_hosts = []
                if (retry_count+1) == MAX_CONNECTION_ATTEMPTS:
                    LOG.verbose("Exit")
                    raise stomp_ex

    def __subscribe(self):
        """
        subscribes to queue in ActiveMQ broker
        Args:
            None
        Returns:
            None
        Raises:
            Exception : Base exception to catch all exception
        """
        try:
            LOG.verbose("Entry")
            subscription_id = 0
            for (destination, ack_mode) in self.subscription_details.items():
                self.conn.subscribe(destination=destination, id=subscription_id,
                                    ack=ACK_MODES[ack_mode],
                                    headers={"activemq.prefetchSize":1}
                                    )
                subscription_id += 1
            LOG.verbose("Exit")
        except Exception as ex:
            LOG.verbose("Exit")
            raise ex

    def consume(self, subscription_details, callback, **kwargs):
        """
        call subscription for activemq and provides connection failure retry with sleep.
        Args:
            subscriber_details : list contains queue names  with configuration(ack_mode).
            callback : callback function to send messages.
        Returns:
            None
        Raises:
            Exception : Base exception to catch all exception
        """
        try:
            LOG.verbose("Entry")
            self.callback = callback
            for (destination, config) in subscription_details.items():
                self.subscription_details[destination] = config.get("ack_mode", 2)
            while True:
                self.__subscribe()
                while self.conn.is_connected():
                    if os.getenv("CONTAINER_STOP", "FALSE") == "TRUE":
                        raise ContainerStoppedError()
                    #this loop holds the main thread till activemq connection available
                    time.sleep(CONSUMER_SLEEP_TIME)
                self.__reconnect()
        except Exception as ex:
            LOG.verbose("Exit")
            raise ex

    def __reconnect(self):
        """
        closes the inactive connection and creates the new connection with activemq.
        Args:
            None
        Returns:
            None
        Raises:
            None
        """
        LOG.verbose("Entry")
        reconnect_counter = 0
        while not self.conn.is_connected():
            try:
                self.close_connection()
                time.sleep(CONSUMER_RETRY_WAIT_TIME)
                self.create_connection()
                LOG.info("Connected to ActiveMQ...")
            except stomp.exception.StompException:
                reconnect_counter += 1
                LOG.info("reconnect failure count :: %s"%reconnect_counter)
        LOG.verbose("Exit")

    def close_connection(self):
        """
        closes the connection.
        Args:
            None
        Returns:
            None
        Raises:
            None
        """
        LOG.verbose("Entry")
        self.conn.disconnect()
        LOG.verbose("Exit")
...