Не получать сообщений при чтении Azure Event Hub с Python с использованием EventHubClient - PullRequest
0 голосов
/ 28 ноября 2018

У меня есть концентратор событий Azure, в котором есть сообщения.Я написал сообщения с приложением Python и вижу правильный счетчик сообщений в графическом интерфейсе Event Hub.Но я не могу читать сообщения с Python.Мой код ниже.Он работает без ошибок, но дает нулевые результаты.

Как ни странно, после запуска этого кода графический интерфейс Event Hub показывает, что все сообщения (пара тысяч) были исходящими, что указывает на то, что моя программа действительно их получила.Но код никогда не отображает их.

Любая помощь приветствуется!

Результат всегда ...

Msg offset: <azure.eventhub.common.Offset object at 0x102fc4e10>
Msg seq: 0
Msg body: 0

Received 1 messages in 0.11292386054992676 seconds

++++++++++++

# pip install azure-eventhub

import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset

logger = logging.getLogger("azure")

# URL of the event hub, amqps://<mynamespace>.servicebus.windows.net/myeventhub
ADDRESS = "amqps://chc-eh-ns.servicebus.windows.net/chc-eh"

# Access tokens for event hub namespace, from Azure portal for namespace
USER = "RootManageSharedAccessKey"
KEY = "XXXXXXXXXXXXXXXXXXXXXXXXXX"

# Additional setup to receive events
CONSUMER_GROUP = "$default"   # our view of the event hub, useful when there is more than one consumer at same time
PARTITION = "0"   # which stream within event hub
OFFSET = Offset("-1")  # get all msgs in event hub. msgs are never removed, they just expire per event hub settings
PREFETCH = 100   # not sure exactly what this does ??

# Initialize variables
total = 0
last_sn = -1
last_offset = -1

client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
    receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=PREFETCH, offset=OFFSET)
    client.run()
    start_time = time.time()
    for event_data in receiver.receive(timeout=100):
        last_offset = event_data.offset
        last_sn = event_data.sequence_number
        print("Msg offset: " + str(last_offset))
        print("Msg seq: " + str(last_sn))
        print("Msg body: " + event_data.body_as_str())
        total += 1

    end_time = time.time()
    client.stop()
    run_time = end_time - start_time
    print("\nReceived {} messages in {} seconds".format(total, run_time))

except KeyboardInterrupt:
    pass

finally:
    client.stop()

1 Ответ

0 голосов
/ 29 ноября 2018

Понял!Пример кода, который я скопировал, был неверным.Вы должны получать пакеты сообщений, а затем перебирать каждый пакет.Вот правильный код ...

# pip install azure-eventhub

import time
from azure.eventhub import EventHubClient, Offset

# URL of the event hub, amqps://<mynamespace>.servicebus.windows.net/myeventhub
ADDRESS = "amqps://chc-eh-ns.servicebus.windows.net/chc-eh"

# Access tokens for event hub namespace, from Azure portal for namespace
USER = "RootManageSharedAccessKey"
KEY = "XXXXXXXXXXXX"

# Additional setup to receive events
CONSUMER_GROUP = "$default"   # our view of the event hub, useful when there is more than one consumer at same time
PARTITION = "0"   # which stream within event hub
OFFSET = Offset("-1")  # get all msgs in event hub. msgs are never removed, they just expire per event hub settings
PREFETCH = 100   # batch size ??

# Initialize variables
total = 0
last_sn = -1
last_offset = -1

client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
    receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=PREFETCH, offset=OFFSET)
    client.run()
    start_time = time.time()
    batch = receiver.receive(timeout=5000)
    while batch:
        for event_data in batch:
            last_offset = event_data.offset
            last_sn = event_data.sequence_number
            print("Msg offset: " + str(last_offset))
            print("Msg seq: " + str(last_sn))
            print("Msg body: " + event_data.body_as_str())
            total += 1
        batch = receiver.receive(timeout=5000)
    end_time = time.time()
    client.stop()
    run_time = end_time - start_time
    print("\nReceived {} messages in {} seconds".format(total, run_time))

except KeyboardInterrupt:
    pass

finally:
    client.stop()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...