Зачем продюсеру нужен ключ привязки для обмена темой? - PullRequest
0 голосов
/ 28 апреля 2019

Точно, у меня есть два вопроса об обмене темами.

Q1: Я думаю, что только очереди и обмен связаны друг с другом, все в порядке. Почему производитель и обмен должныбыть привязанным?

Например,

# producer-->exchange--routing_key-->queues and customer
cat run.log |grep "routing_key_a" >> customer A;
cat run.log |grep "routing_key_b" >> customer B;

Я принимаю cat run.log в качестве источника сообщений и grep "routing_key_a" в качестве ключа_оценки, производитель создал все сообщения и разослал различным клиентам A и B.используя разные routing_keys.Это хорошо понять, поэтому я не понял, почему мы все еще нуждаемся в привязке производителя и обмена друг к другу?

Q2: Могу ли я использовать карты воли в привязках производителя для обмена (не карты воли)в обмен привязки к очереди)?Я пытался и столкнулся с некоторыми проблемами, как товарищ.

class RabbitmqServer:
    def __init__(self, host, port, nick_name="", topics=None, log_data=None):
        self.host = host
        self.port = port
        self.nick_name = nick_name
        self.connection = self._build_connection()
        self.channel = self._open_channel()
        self.exchange = self._declare_exchange()
        self.topics = topics if isinstance(topics, (list, tuple)) else []  # used to binding_key and routing_key
        self.log_data = log_data if log_data else []  # used to create log

    def _build_connection(self):
        return pika.BlockingConnection(
            pika.ConnectionParameters(host=self.host, port=self.port)
        )

    def _open_channel(self):
        return self.connection.channel()

    def _declare_exchange(self):
        return self.channel.exchange_declare(exchange="radius_logs", exchange_type="topic")

Ниже приведен код производителя.

class SimpleLogger(RabbitmqServer):
    def __init__(self, *args, **kwargs):
        super(SimpleLogger, self).__init__(*args, **kwargs)

    def _publish(self, message):
        i = 1
        while True:
            log_index = i % len(self.log_data)
            log = self.log_data[log_index]
            _message = json.dumps(f"[{log}] "
                                  f"There is a piece log from radius server: {message}-No.-{i}")
            for topic in self.topics:
                self.channel.basic_publish(
                    exchange="radius_logs",
                    routing_key=topic,  
                    body=_message
                )
            print(f"producer with message: {_message}")
            i += 1
            time.sleep(0.15)

    def run(self):
        try:
            self._publish("1PMC")
        except Exception as e:
            print(f"Found exception: {e}")
            print("Exit..")
            self.connection.close()

Ниже приведены потребители.

class SimpleConsumer(RabbitmqServer):
    def __init__(self, *args, **kwargs):
        super(SimpleConsumer, self).__init__(*args, **kwargs)

    def _call_back(self, ch, method, properties, body):
        print(f"consumer with name = {self.nick_name} received {body}")
        ch.basic_ack(delivery_tag=method.delivery_tag)
        time.sleep(0.4)

    def _consume(self):
        queue = self.channel.queue_declare(queue="", exclusive=True)
        queue_name = queue.method.queue

        for t in self.topics:
            self.channel.queue_bind(exchange="radius_logs", queue=queue_name, routing_key=t)

        self.channel.basic_consume(on_message_callback=self._call_back, queue=queue_name, auto_ack=False)
        self.channel.start_consuming()

    def run(self):
        try:
            self._consume()
        except Exception as e:
            print(f"Found exception: {e}")
            print("Exit..")
            self.connection.close()

Ниже приведеныкод запуска.

if __name__ == "__main__":
    r_host = "10.0.7.176"
    r_port = 5672

    p1_binding_keys = ["*.orange.*", ]
    c1_binding_keys = ["*.orange.*", ]
    c2_binding_keys = ["*.*.rabbit", "lazy.#"]
    log_data = [
        "quick.orange.rabbit",          # will send to C1
        "lazy.orange.elephant",         # will send to C1
        "quick.orange.fox",             # will send to C1
        "lazy.brown.fox",               # will send to nobody
        "lazy.pink.rabbit",             # will send to nobody
        "quick.brown.fox",              # will send to nobody
        "orange",                       # will send to nobody
        "quick.orange.male.rabbit",     # will send to nobody
        "lazy.orange.male.rabbit",      # will send to nobody
    ]
    p1 = SimpleLogger(host=r_host, port=r_port, nick_name="P1", log_data=log_data, topics=p1_binding_keys)
    c1 = SimpleConsumer(host=r_host, port=r_port, nick_name="C1", topics=c1_binding_keys)
    c2 = SimpleConsumer(host=r_host, port=r_port, nick_name="C2", topics=c2_binding_keys)

    t1 = threading.Thread(target=p1.run)
    t2 = threading.Thread(target=c1.run)
    t3 = threading.Thread(target=c2.run)

    t1.start()
    t2.start()
    t3.start()

Ожидание: log_data - список, используемый для создания содержимого журнала, ожидается только "quick.orange.rabbit", "lazy.orange.elephant", "lazy.brown.fox", отправка получателю C1, остальное будет потеряно.

На самом деле: На C2 не было отправлено сообщение, что соответствует нашим ожиданиям.

 ⚙  rabitmq_demo # grep -i "C2" 5.log
 ✘ ⚙  rabitmq_demo 

Однако все сообщения отправляются на C1, что отличается от ожидаемого,Почему сообщение типа "quick.brown.fox", может быть отправлено на C1?Я действительно запутался с привязкой ключа производителя к обмену, как сказал Вопрос1, я думаю, что нет необходимости связывать производителя обменом, я прав?Любой, кто мог бы объяснить это мне, большое спасибо.

 rabitmq_demo # grep -i "C1" run.log
consumer with name = C1 received b'"[quick.orange.fox] There is a piece log from radius server: 1PMC-No.-2"'
consumer with name = C1 received b'"[lazy.brown.fox] There is a piece log from radius server: 1PMC-No.-3"'
consumer with name = C1 received b'"[lazy.pink.rabbit] There is a piece log from radius server: 1PMC-No.-4"'
consumer with name = C1 received b'"[quick.brown.fox] There is a piece log from radius server: 1PMC-No.-5"'
consumer with name = C1 received b'"[orange] There is a piece log from radius server: 1PMC-No.-6"'
consumer with name = C1 received b'"[quick.orange.male.rabbit] There is a piece log from radius server: 1PMC-No.-7"'
consumer with name = C1 received b'"[lazy.orange.male.rabbit] There is a piece log from radius server: 1PMC-No.-8"'
consumer with name = C1 received b'"[quick.orange.rabbit] There is a piece log from radius server: 1PMC-No.-9"'
consumer with name = C1 received b'"[lazy.orange.elephant] There is a piece log from radius server: 1PMC-No.-10"'
consumer with name = C1 received b'"[quick.orange.fox] There is a piece log from radius server: 1PMC-No.-11"'
consumer with name = C1 received b'"[lazy.brown.fox] There is a piece log from radius server: 1PMC-No.-12"'
consumer with name = C1 received b'"[lazy.pink.rabbit] There is a piece log from radius server: 1PMC-No.-13"'
consumer with name = C1 received b'"[quick.brown.fox] There is a piece log from radius server: 1PMC-No.-14"'
consumer with name = C1 received b'"[orange] There is a piece log from radius server: 1PMC-No.-15"'

...