Как и в приведенном ниже коде, я постоянно получаю данные из Azure Eventhub. Часто я вижу сообщение об ошибке «Превышено максимальное количество разрешенных получателей на раздел», откуда я знаю.
import os
import sys
import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset
logger = logging.getLogger("azure")
ADDRESS = ""
USER = ""
KEY = ""
CONSUMER_GROUP = "$default"
OFFSET = Offset("@latest")
PARTITION = "0"
total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0, offset=OFFSET)
client.run()
start_time = time.time()
while True:
for event_data in receiver.receive(timeout=5000):
print("Received: {}".format(event_data.body_as_str(encoding='UTF-8')))
a = event_data.body_as_str(encoding='UTF-8')
total += 1
end_time = time.time()
run_time = end_time - start_time
print("Received {} messages in {} seconds".format(total, run_time))
Здесь указана строка, добавляющая получателя, и если я добавлю больше, чемпять приемников, он достигает предела возможного количества приемников на раздел.
receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0
- Итак, я попытался удалить приемник с функциями в нем. например, используя
receiver.client.clients.remove
или receiver.client.clients.clear()
, чтобы очистить ранее добавленные получатели. Однако ни один из этих методов не работает.
Причина, по которой я вижу эту ошибку, заключается в том, что, поскольку я запускаю весь приведенный выше код всякий раз, когда мне нужно прекратить запуск сценария для отладки, поэтому всякий раз, когда я перезапускаю его, мне приходится запускать строку receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0
Я также пытался запустить только часть кода ниже строки "add_receiver"
client.run()
start_time = time.time()
while True:
for event_data in receiver.receive(timeout=5000):
print("Received: {}".format(event_data.body_as_str(encoding='UTF-8')))
a = event_data.body_as_str(encoding='UTF-8')
total += 1
end_time = time.time()
run_time = end_time - start_time
print("Received {} messages in {} seconds".format(total, run_time))
Однако я вижу еще одну ошибку, сообщающую, что
EventHubError: This receive handler is now closed.
Любые возможные способырешить эту проблему?