У меня есть скрипт Azure Databricks в Python, который читает сообщения JSON из Event Hub с использованием структурированной потоковой передачи, обрабатывает сообщения и сохраняет результаты в хранилище озера данных.Сообщения отправляются в концентратор событий из приложения логики Azure, которое читает твиты из API Twitter.
Я пытаюсь десериализовать тело сообщения концентратора событий, чтобы обработать его содержимое.Тело сообщения сначала преобразуется из двоичного в строковое значение, а затем десериализуется в тип структуры с помощью функции from_json
, как описано в этой статье: https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html
Вот пример кода (с запутанными параметрами):
from pyspark.sql.functions import from_json, to_json
from pyspark.sql.types import DateType, StringType, StructType
EVENT_HUB_CONN_STRING = 'Endpoint=sb://myehnamespace.servicebus.windows.net/;SharedAccessKeyName=Listen;SharedAccessKey=xxx;EntityPath=myeh'
OUTPUT_DIR = '/mnt/DataLake/output'
CHECKPOINT_DIR = '/mnt/DataLake/checkpoint'
event_hub_conf = {
'eventhubs.connectionString' : EVENT_HUB_CONN_STRING
}
stream_data = spark \
.readStream \
.format('eventhubs') \
.options(**event_hub_conf) \
.option('multiLine', True) \
.option('mode', 'PERMISSIVE') \
.load()
schema = StructType() \
.add('FetchTimestampUtc', DateType()) \
.add('Username', StringType()) \
.add('Name', StringType()) \
.add('TweetedBy', StringType()) \
.add('Location', StringType()) \
.add('TweetText', StringType())
stream_data_body = stream_data \
.select(stream_data.body) \
.select(from_json('body', schema).alias('body')) \
.select(to_json('body').alias('body'))
# This works (bare string value, no deserialization):
# stream_data_body = stream_data.select(stream_data.body)
stream_data_body \
.writeStream \
.outputMode('append') \
.format('json') \
.option('path', OUTPUT_DIR) \
.option('checkpointLocation', CHECKPOINT_DIR) \
.start() \
.awaitTermination()
Здесь я на самом деле пока не выполняю никакой обработки, просто тривиальную десериализацию / сериализацию.
Приведенный выше скрипт действительно выводит данные в Data Lake, но полученные объекты JSON пусты,Вот пример вывода:
{}
{}
{}
Закомментированный код в скрипте действительно производит вывод, но это всего лишь строковое значение, поскольку мы не включили десериализацию:
{"body":"{\"FetchTimestampUtc\": 2018-10-16T09:21:40.6173187Z, \"Username\": ... }}
Мне было интересно, если обратные слеши должны быть удвоены, как в примере, приведенном в ссылке выше?Это может быть выполнено с параметром options функции from_json
: «options для управления синтаксическим анализом. Принимает те же опции, что и источник данных json».Но я не нашел документации для формата опций.
Есть идеи, почему не работает десериализация / сериализация?