Как получить последние данные только в концентраторе событий - PullRequest
0 голосов
/ 22 октября 2019

В eventhub у меня есть оба сценария "отправитель" и "получатель" для связи между этими двумя.

Проблема, с которой я сталкиваюсь, заключается в том, что мне кажется, что я получаю набор данных, который я отправил вчера, плюс тот, который я только что отправил вместе. Я пытаюсь контролировать количество данных по периоду времени или количеству событий.

Основной код для sender.py следующий:


CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"

total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
    receiver = client.add_receiver(
        CONSUMER_GROUP, PARTITION, prefetch=0, offset=OFFSET)
    client.run()
    start_time = time.time()
    batch = receiver.receive(timeout=100)

    for event_data in batch[-10:]:
        print("Received: {}".format(event_data.body_as_str(encoding='UTF-8')))
        total += 1

    end_time = time.time()
    client.stop()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))

except KeyboardInterrupt:
    pass
finally:
    client.stop()

1 Ответ

1 голос
/ 22 октября 2019

Я только что нашел решение, которое использует смещение для управления процессом чтения данных события.

Сначала нам нужно получить смещение данных события.

код, подобный приведенному ниже:

logger = logging.getLogger("azure")

ADDRESS = "amqps://xxx.servicebus.windows.net/xxx"
USER = "RootManageSharedAccessKey"
KEY = "xxx"

CONSUMER_GROUP = "$default"

#first, set offset to -1 to read all the event data
OFFSET = Offset("-1")
PARTITION = "0"

total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
    receiver = client.add_receiver(
        CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)
    client.run()
    start_time = time.time()
    print("**begin receive**")
    for event_data in receiver.receive(timeout=100):
        last_offset = event_data.offset.value
        last_sn = event_data.sequence_number
        #here, we print out the offset of each event data
        print("Received: {}, last_offset: {}, last_sn: {}".format(event_data.body_as_str(encoding='UTF-8'),last_offset,last_sn))        
        total += 1

    end_time = time.time()
    client.stop()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))

except KeyboardInterrupt:
    pass
finally:
    client.stop()

после выполнения вы можете увидеть все смещения каждой информации, снимок экрана, как показано ниже:

enter image description here

тогда вы знаете смещение данных каждого события. И если вы хотите получить данные с номера 40 на номер 53. Смещение для номера 40 равно 237080, поэтому в вашем коде измените смещение на значение чуть меньше 237080, установите его равным 237079 в этой строке кода OFFSET = Offset("237079").

Код, подобный приведенному ниже:

logger = logging.getLogger("azure")

ADDRESS = "amqps://xxx.servicebus.windows.net/xx"
USER = "RootManageSharedAccessKey"
KEY = "xxx"

CONSUMER_GROUP = "$default"

#set the offset
OFFSET = Offset("237079")
PARTITION = "0"

total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
    receiver = client.add_receiver(
        CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)
    client.run()
    start_time = time.time()
    print("**begin receive**")
    for event_data in receiver.receive(timeout=100):
        last_offset = event_data.offset.value
        last_sn = event_data.sequence_number
        print("Received: {}, last_offset: {}, last_sn: {}".format(event_data.body_as_str(encoding='UTF-8'),last_offset,last_sn))        
        total += 1

    end_time = time.time()
    client.stop()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))

except KeyboardInterrupt:
    pass
finally:
    client.stop()

после выполнения кода возвращаются только данные события из указанного смещения. Снимок экрана, как показано ниже:

enter image description here

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...