Я скопировал этот скрипт, и он работает хорошо. Я получаю все события из eventhub:
import logging
from azure.eventhub import EventHubConsumerClient
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)
def on_event(partition_context, event):
logger.info("Received event from partition {}".format(partition_context.partition_id))
partition_context.update_checkpoint(event)
with client:
client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# client.receive(on_event=on_event, partition_id='0')
Однако, как только я пытаюсь получить какое-либо значение из событий, я получаю исключение OWNERSHIP_LOST, и скрипт не получает ничего.
Я пробовал event.body.KEY
, event.KEY
, json.loads(event)['body']['key']
, json.loads(event)['key']
. Что бы я ни пытался сделать с events
, это вызывает OWNERSHIP_LOST.
Знаете ли вы обходной путь или случайно знаете, что я делаю здесь неправильно?