Обработка данных концентратора событий с использованием Python - PullRequest
0 голосов
/ 06 сентября 2018

Я использую Azure концентратор событий Python SDK для отправки и получения сообщений от концентратора событий по этой ссылке. https://github.com/Azure/azure-event-hubs-python/tree/develop. Я могу успешно отправлять и получать сообщения. Но как мне проанализировать сообщения и извлечь данные из объекта данных события. Пожалуйста, найдите код ниже.

import os
import sys
#import logging
from azure.eventhub import EventHubClient, Receiver, Offset

ADDRESS = 'sb://####.servicebus.windows.net/#####'
USER = '##########'
KEY = '##################################'
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "1"


total = 0
last_sn = -1
last_offset = "-1"

try:
  if not ADDRESS:
      raise ValueError("No EventHubs URL supplied.")
  client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
  receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000, 
  offset=OFFSET)
  client.run()
  try:
      batched_events = receiver.receive(timeout=20)
  except:
      raise
  finally:
      client.stop()
  for event_data in batched_events:
      last_offset = event_data.offset.value
      last_sn = event_data.sequence_number
      total += 1
      print("Partition {}, Received {}, sn={} offset={}".format(
         PARTITION,
         total,
         last_sn,
         last_offset))

except KeyboardInterrupt:
   pass

Если я попытаюсь просмотреть полученные event_data, я могу увидеть следующее сообщение. event_data <azure.eventhub.common.EventData at 0xd4f1358> event_data.message

<uamqp.message.Message at 0xd4f1240>

Любая помощь по поводу того, как разобрать это сообщение для извлечения данных.

Ответы [ 2 ]

0 голосов
/ 02 декабря 2018

Начиная с 1.1.0, существуют новые служебные методы для извлечения фактических данных сообщения:

Итак, что раньше было

import json
event_obj = json.loads(next(event_data.body).decode('UTF-8'))

Сейчас:

event_obj = event_data.body_as_json()
0 голосов
/ 07 сентября 2018

Согласно исходному коду на
https://github.com/Azure/azure-event-hubs-python/blob/master/azure/eventhub/common.py#L185 event_data.body должен вернуть вам тело сообщения.
body возвращает генератор. Чтобы получить доступ к отдельному сообщению по одному, используйте метод next.

messages = event_data.body
next(messages) -> get 1st message
next(messages) -> get 2nd message
...
next(messages) -> get the last message
next(messages) -> StopIteration

StopIteration исключение относится к концу итерации, больше не осталось сообщений для извлечения.

Если вы хотите получить все сообщения одновременно, вы можете использовать - list(event_data.body)

Используя эти потрясающие встроенные функции Python, вы могли бы найти ответ самостоятельно:

  • Используйте встроенную функцию Python dir, чтобы узнать, какие все методы и значения поддерживает объект. Если вы сделаете dir(event_data), вы увидите метод - body в списке
  • Чтобы просмотреть документацию (конкретные строки документации) о любом методе / функции / классе, вы можете сделать, print object.__doc__. В вашем случае выполнение print event_data.body.__doc__ напечатает то, что делает body.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...