Превышено максимальное количество разрешенных получателей на раздел Ошибка в eventhub - PullRequest
0 голосов
/ 31 октября 2019

Как и в приведенном ниже коде, я постоянно получаю данные из Azure Eventhub. Часто я вижу сообщение об ошибке «Превышено максимальное количество разрешенных получателей на раздел», откуда я знаю.

import os
import sys
import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset

logger = logging.getLogger("azure")

ADDRESS = ""
USER = ""
KEY = ""

CONSUMER_GROUP = "$default"
OFFSET = Offset("@latest")
PARTITION = "0"

total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)

receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0, offset=OFFSET)    
client.run()
start_time = time.time()
while True:
    for event_data in receiver.receive(timeout=5000):
        print("Received: {}".format(event_data.body_as_str(encoding='UTF-8')))
        a = event_data.body_as_str(encoding='UTF-8')
        total += 1
    end_time = time.time()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))

Здесь указана строка, добавляющая получателя, и если я добавлю больше, чемпять приемников, он достигает предела возможного количества приемников на раздел.

receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0
  1. Итак, я попытался удалить приемник с функциями в нем. например, используя receiver.client.clients.remove или receiver.client.clients.clear(), чтобы очистить ранее добавленные получатели. Однако ни один из этих методов не работает.

Причина, по которой я вижу эту ошибку, заключается в том, что, поскольку я запускаю весь приведенный выше код всякий раз, когда мне нужно прекратить запуск сценария для отладки, поэтому всякий раз, когда я перезапускаю его, мне приходится запускать строку receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0

Я также пытался запустить только часть кода ниже строки "add_receiver"
client.run()
start_time = time.time()
while True:
    for event_data in receiver.receive(timeout=5000):
        print("Received: {}".format(event_data.body_as_str(encoding='UTF-8')))
        a = event_data.body_as_str(encoding='UTF-8')
        total += 1
    end_time = time.time()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))

Однако я вижу еще одну ошибку, сообщающую, что EventHubError: This receive handler is now closed.

Любые возможные способырешить эту проблему?

1 Ответ

1 голос
/ 31 октября 2019

После борьбы, я думаю, что это может быть решением этой проблемы.

Для строки 22, приведенной выше, я мог бы просто добавить к ней вход «keep_alive», например:

receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0, offset=OFFSET, keep_alive = 10000000)  

Таким образом, я могу оставить приемник клиента открытым и не получитьсообщение об ошибке «EventHubError: этот обработчик получения теперь закрыт.», затем просто запустите единственную часть снизу:

start_time = time.time()
while True:
    for event_data in receiver.receive(timeout=5000):
        print("Received: {}".format(event_data.body_as_str(encoding='UTF-8')))
        a = event_data.body_as_str(encoding='UTF-8')
        total += 1
    end_time = time.time()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))
...