Исходная ситуация
Сериализированные события AVRO отправляются в концентратор событий Azure. Эти события хранятся постоянно с помощью функции захвата концентраторов событий Azure. Захваченные данные вместе с метаданными концентратора событий записываются в формате Apache Avro. Исходные события, содержащиеся в файле авро захвата, должны быть проанализированы с использованием (py) Spark.
Вопрос
Как десериализовать сериализованное событие AVRO, которое содержится в поле / столбце файла AVRO, с использованием (py) Spark? (Аннотация: avro-схема события не известна приложению читателя, но она содержится в сообщении в виде заголовка avro)
Фон
Фон является аналитической платформой для сценария IoT. Сообщения предоставляются платформой IoT, работающей на kafka. Чтобы быть более гибким с изменениями схемы, стратегическое решение - придерживаться формата avro.
Чтобы разрешить использование Azure Stream Analytics (ASA), для каждого сообщения указывается схема avro (в противном случае ASA не сможет десериализовать сообщение).
захват файла авро схемы
Схема файлов avro, созданных функцией захвата концентратора событий, приведена ниже:
{
"type":"record",
"name":"EventData",
"namespace":"Microsoft.ServiceBus.Messaging",
"fields":[
{"name":"SequenceNumber","type":"long"},
{"name":"Offset","type":"string"},
{"name":"EnqueuedTimeUtc","type":"string"},
{"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},
{"name":"Properties","type":{"type":"map","values":["long","double","string","bytes"]}},
{"name":"Body","type":["null","bytes"]}
]
}
(обратите внимание, что фактическое сообщение хранится в поле тела в виде байтов)
пример авро схемы
Для иллюстрации я отправил события со следующей схемой avro в концентратор событий:
{
"type" : "record",
"name" : "twitter_schema",
"namespace" : "com.test.avro",
"fields" : [
{"name" : "username","type" : "string"},
{"name" : "tweet","type" : "string"},
{"name" : "timestamp","type" : "long"}
],
}
пример события
{
"username": "stackoverflow",
"tweet": "please help deserialize me",
"timestamp": 1366150681
}
пример полезных данных сообщения avro
(кодируется как строка / примечание, что схема avro включена)
Objavro.schema�{"type":"record","name":"twitter_schema","namespace":"com.test.avro","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"},{"name":"timestamp","type":"long"}]}
Таким образом, в конце эта полезная нагрузка будет сохранена в виде байтов в поле «Body» файла захвата avro.
.
.
Мой текущий подход
Для простоты использования, тестирования и отладки в настоящее время я использую ноутбук pyspark jupyter.
Сессия Config of Spark:
%%configure
{
"conf": {
"spark.jars.packages": "com.databricks:spark-avro_2.11:4.0.0"
}
}
чтение файла avro в кадр данных и вывод результата:
capture_df = spark.read.format("com.databricks.spark.avro").load("[pathToCaptureAvroFile]")
capture_df.show()
результат:
+--------------+------+--------------------+----------------+----------+--------------------+
|SequenceNumber|Offset| EnqueuedTimeUtc|SystemProperties|Properties| Body|
+--------------+------+--------------------+----------------+----------+--------------------+
| 71| 9936|11/4/2018 4:59:54 PM| Map()| Map()|[4F 62 6A 01 02 1...|
| 72| 10448|11/4/2018 5:00:01 PM| Map()| Map()|[4F 62 6A 01 02 1...|
получение содержимого поля Body и приведение его к строке:
msgRdd = capture_df.select(capture_df.Body.cast("string")).rdd.map(lambda x: x[0])
Вот как далеко я получил работающий код. Потратил много времени, пытаясь десериализовать реальное сообщение, но безуспешно. Буду признателен за любую помощь!
Некоторая дополнительная информация:
Spark работает в кластере Microsoft Azure HDInsight 3.6. Версия Spark 2.2. Версия Python 2.7.12.