Десериализация сообщений концентратора событий в блоках данных Azure - PullRequest
0 голосов
/ 16 октября 2018

У меня есть скрипт Azure Databricks в Python, который читает сообщения JSON из Event Hub с использованием структурированной потоковой передачи, обрабатывает сообщения и сохраняет результаты в хранилище озера данных.Сообщения отправляются в концентратор событий из приложения логики Azure, которое читает твиты из API Twitter.

Я пытаюсь десериализовать тело сообщения концентратора событий, чтобы обработать его содержимое.Тело сообщения сначала преобразуется из двоичного в строковое значение, а затем десериализуется в тип структуры с помощью функции from_json, как описано в этой статье: https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

Вот пример кода (с запутанными параметрами):

from pyspark.sql.functions import from_json, to_json
from pyspark.sql.types import DateType, StringType, StructType

EVENT_HUB_CONN_STRING = 'Endpoint=sb://myehnamespace.servicebus.windows.net/;SharedAccessKeyName=Listen;SharedAccessKey=xxx;EntityPath=myeh'
OUTPUT_DIR = '/mnt/DataLake/output'
CHECKPOINT_DIR = '/mnt/DataLake/checkpoint'

event_hub_conf = {
    'eventhubs.connectionString' : EVENT_HUB_CONN_STRING
}

stream_data = spark \
    .readStream \
    .format('eventhubs') \
    .options(**event_hub_conf) \
    .option('multiLine', True) \
    .option('mode', 'PERMISSIVE') \
    .load()

schema = StructType() \
    .add('FetchTimestampUtc', DateType()) \
    .add('Username', StringType()) \
    .add('Name', StringType()) \
    .add('TweetedBy', StringType()) \
    .add('Location', StringType()) \
    .add('TweetText', StringType())

stream_data_body = stream_data \
    .select(stream_data.body) \
    .select(from_json('body', schema).alias('body')) \
    .select(to_json('body').alias('body'))

# This works (bare string value, no deserialization):
# stream_data_body = stream_data.select(stream_data.body)

stream_data_body \
    .writeStream \
    .outputMode('append') \
    .format('json') \
    .option('path', OUTPUT_DIR) \
    .option('checkpointLocation', CHECKPOINT_DIR) \
    .start() \
    .awaitTermination()

Здесь я на самом деле пока не выполняю никакой обработки, просто тривиальную десериализацию / сериализацию.

Приведенный выше скрипт действительно выводит данные в Data Lake, но полученные объекты JSON пусты,Вот пример вывода:

{}
{}
{}

Закомментированный код в скрипте действительно производит вывод, но это всего лишь строковое значение, поскольку мы не включили десериализацию:

{"body":"{\"FetchTimestampUtc\": 2018-10-16T09:21:40.6173187Z, \"Username\": ... }}

Мне было интересно, если обратные слеши должны быть удвоены, как в примере, приведенном в ссылке выше?Это может быть выполнено с параметром options функции from_json: «options для управления синтаксическим анализом. Принимает те же опции, что и источник данных json».Но я не нашел документации для формата опций.

Есть идеи, почему не работает десериализация / сериализация?

1 Ответ

0 голосов
/ 29 ноября 2018

Похоже, что входной JSON должен иметь определенный синтаксис.Значения полей должны быть строками, временные метки не допускаются (и, возможно, то же самое относится и к целым числам, числам с плавающей запятой и т. Д.).Преобразование типов должно выполняться внутри скрипта Databricks.

Я изменил входной JSON, чтобы значение метки времени было заключено в кавычки.В схеме я также изменил DateType на TimestampType (что более уместно), а НЕ на StringType.

, используя следующее выражение выбора:

stream_data_body = stream_data \
    .select(from_json(stream_data.body.cast('string'), schema).alias('body')) \
    .select(to_json('body').alias('body'))

в выходном файле создается следующий вывод:

{"body":"{\"FetchTimestampUtc\":\"2018-11-29T21:26:40.039Z\",\"Username\":\"xyz\",\"Name\":\"x\",\"TweetedBy\":\"xyz\",\"Location\":\"\",\"TweetText\":\"RT @z123: I just want to say thanks to everyone who interacts with me, whether they talk or they just silently rt or like, thats okay.…\"}"}

, который является видом ожидаемого результата, хотя значение метки времени выводится как строковое значение.Фактически весь объект body выводится в виде строки.

Мне не удалось заставить работу работать, если входной формат соответствует JSON с собственными типами полей.В этом случае вывод from_json всегда равен нулю.

РЕДАКТИРОВАТЬ: Это, кажется, было путаница с моей стороны.Значения даты всегда должны заключаться в кавычки в формате JSON, они не являются «нативными» типами.

Я проверил, что целочисленные значения и значения с плавающей точкой могут передаваться без кавычек, чтобы с ними можно было выполнять вычисления.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...