У меня есть потребитель ActiveMQ, записанный в библиотеке Python stomp.py. Это долгосрочные потребители (например, 1 неделя). Мы отключили сердцебиение (0,0). Через некоторое время (например, 20 часов) потребители застревают, когда выбирают сообщение. Мы написали потребителю механизм переподключения, который будет переподключаться после сбоя соединения, но потребители все равно застряли.
Logi c:
- try для подключения
- is_connected () #true или false
- , если false -> добрался до шага 1
Подробности среды:
- Python 3.7
- stomp.py 4.1.22
- ActiveMQ 5.15.8 [настройка сети брокеров с активным / резервным {главным / подчиненным}]]
Python код:
import ssl
import stomp
LOG = custom_logger.get_logger("core")
class ActiveMQConsumer(ConsumerInterface):
def __init__(self, service_conf):
"""
Initialize the ActiveMQConsumer class that call create connection and adds listener.
Args:
service_conf : ActiveMQ configuration dict.
Returns:
None
Raises:
ConnectFailedException : Error while unable to connect to ActiveMQ
Exception : Base exception to catch all other exception
"""
try:
LOG.verbose("Entry")
self.username = service_conf["username"]
self.password = service_conf["password"]
self.hosts = service_conf["hosts"]
self.ports = service_conf["ports"]
self.callback = None
self.subscription_details = {}
self.heartbeats = service_conf.get("heartbeats", (0, 0))
self.heart_beat_receive_scale = service_conf.get("heart_beat_receive_scale", 2.0)
self.listener_obj = ConsumerListener(self)
self.create_connection()
except stomp.exception.ConnectFailedException as stomp_ex:
raise stomp_ex
except Exception as ex:
raise ex
def create_connection(self):
"""
create connection with activemq brokers
Args:
None
Returns:
None
Raises:
Exception : Base exception to catch all exception
"""
LOG.verbose("Entry")
exclude_hosts = []
for retry_count in range(MAX_CONNECTION_ATTEMPTS):
try:
selected_host, selected_port = select_host(self.hosts, self.ports,
exclude_hosts)
conn_param = zip(selected_host, selected_port)
self.conn = stomp.Connection11(conn_param, heartbeats=self.heartbeats,
encoding=ENCODE_FORMAT,
heart_beat_receive_scale=\
self.heart_beat_receive_scale,
reconnect_attempts_max=MAX_STOMP_RECONNECT_ATTEMPTS,
reconnect_sleep_increase=RECONNECT_SLEEP_INCREASE)
self.conn.set_ssl(for_hosts=list(zip(selected_host, selected_port)),
ssl_version=ssl.PROTOCOL_TLS)
self.conn.set_listener('connection_listener', self.listener_obj)
self.conn.start()
self.conn.connect(self.username, self.password, wait=True)
LOG.info("Connected to activemq host :: "+str(selected_host))
LOG.verbose("Exit")
break
except stomp.exception.ConnectFailedException as stomp_ex:
LOG.info("retry fail count::"+str(retry_count))
exclude_hosts.extend(selected_host)
if len(exclude_hosts) == len(self.hosts):
exclude_hosts = []
if (retry_count+1) == MAX_CONNECTION_ATTEMPTS:
LOG.verbose("Exit")
raise stomp_ex
def __subscribe(self):
"""
subscribes to queue in ActiveMQ broker
Args:
None
Returns:
None
Raises:
Exception : Base exception to catch all exception
"""
try:
LOG.verbose("Entry")
subscription_id = 0
for (destination, ack_mode) in self.subscription_details.items():
self.conn.subscribe(destination=destination, id=subscription_id,
ack=ACK_MODES[ack_mode],
headers={"activemq.prefetchSize":1}
)
subscription_id += 1
LOG.verbose("Exit")
except Exception as ex:
LOG.verbose("Exit")
raise ex
def consume(self, subscription_details, callback, **kwargs):
"""
call subscription for activemq and provides connection failure retry with sleep.
Args:
subscriber_details : list contains queue names with configuration(ack_mode).
callback : callback function to send messages.
Returns:
None
Raises:
Exception : Base exception to catch all exception
"""
try:
LOG.verbose("Entry")
self.callback = callback
for (destination, config) in subscription_details.items():
self.subscription_details[destination] = config.get("ack_mode", 2)
while True:
self.__subscribe()
while self.conn.is_connected():
if os.getenv("CONTAINER_STOP", "FALSE") == "TRUE":
raise ContainerStoppedError()
#this loop holds the main thread till activemq connection available
time.sleep(CONSUMER_SLEEP_TIME)
self.__reconnect()
except Exception as ex:
LOG.verbose("Exit")
raise ex
def __reconnect(self):
"""
closes the inactive connection and creates the new connection with activemq.
Args:
None
Returns:
None
Raises:
None
"""
LOG.verbose("Entry")
reconnect_counter = 0
while not self.conn.is_connected():
try:
self.close_connection()
time.sleep(CONSUMER_RETRY_WAIT_TIME)
self.create_connection()
LOG.info("Connected to ActiveMQ...")
except stomp.exception.StompException:
reconnect_counter += 1
LOG.info("reconnect failure count :: %s"%reconnect_counter)
LOG.verbose("Exit")
def close_connection(self):
"""
closes the connection.
Args:
None
Returns:
None
Raises:
None
"""
LOG.verbose("Entry")
self.conn.disconnect()
LOG.verbose("Exit")