PySpark: десериализация сериализованного сообщения Avro, содержащегося в файле avro для захвата событий - PullRequest
0 голосов
/ 08 ноября 2018

Исходная ситуация

Сериализированные события AVRO отправляются в концентратор событий Azure. Эти события хранятся постоянно с помощью функции захвата концентраторов событий Azure. Захваченные данные вместе с метаданными концентратора событий записываются в формате Apache Avro. Исходные события, содержащиеся в файле авро захвата, должны быть проанализированы с использованием (py) Spark.


Вопрос

Как десериализовать сериализованное событие AVRO, которое содержится в поле / столбце файла AVRO, с использованием (py) Spark? (Аннотация: avro-схема события не известна приложению читателя, но она содержится в сообщении в виде заголовка avro)


Фон

Фон является аналитической платформой для сценария IoT. Сообщения предоставляются платформой IoT, работающей на kafka. Чтобы быть более гибким с изменениями схемы, стратегическое решение - придерживаться формата avro. Чтобы разрешить использование Azure Stream Analytics (ASA), для каждого сообщения указывается схема avro (в противном случае ASA не сможет десериализовать сообщение).

захват файла авро схемы

Схема файлов avro, созданных функцией захвата концентратора событий, приведена ниже:

{
    "type":"record",
    "name":"EventData",
    "namespace":"Microsoft.ServiceBus.Messaging",
    "fields":[
                 {"name":"SequenceNumber","type":"long"},
                 {"name":"Offset","type":"string"},
                 {"name":"EnqueuedTimeUtc","type":"string"},
                 {"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},
                 {"name":"Properties","type":{"type":"map","values":["long","double","string","bytes"]}},
                 {"name":"Body","type":["null","bytes"]}
             ]
}

(обратите внимание, что фактическое сообщение хранится в поле тела в виде байтов)

пример авро схемы

Для иллюстрации я отправил события со следующей схемой avro в концентратор событий:

{
    "type" : "record",
    "name" : "twitter_schema",
    "namespace" : "com.test.avro",
    "fields" : [ 
                {"name" : "username","type" : "string"}, 
                {"name" : "tweet","type" : "string"},
                {"name" : "timestamp","type" : "long"}
    ],
}

пример события

{
    "username": "stackoverflow",
    "tweet": "please help deserialize me",
    "timestamp": 1366150681
}

пример полезных данных сообщения avro

(кодируется как строка / примечание, что схема avro включена)

Objavro.schema�{"type":"record","name":"twitter_schema","namespace":"com.test.avro","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"},{"name":"timestamp","type":"long"}]}

Таким образом, в конце эта полезная нагрузка будет сохранена в виде байтов в поле «Body» файла захвата avro.

.
.


Мой текущий подход

Для простоты использования, тестирования и отладки в настоящее время я использую ноутбук pyspark jupyter.

Сессия Config of Spark:

%%configure
{
    "conf": {
        "spark.jars.packages": "com.databricks:spark-avro_2.11:4.0.0"
    }
}

чтение файла avro в кадр данных и вывод результата:

capture_df = spark.read.format("com.databricks.spark.avro").load("[pathToCaptureAvroFile]")
capture_df.show()

результат:

+--------------+------+--------------------+----------------+----------+--------------------+
|SequenceNumber|Offset|     EnqueuedTimeUtc|SystemProperties|Properties|                Body|
+--------------+------+--------------------+----------------+----------+--------------------+
|            71|  9936|11/4/2018 4:59:54 PM|           Map()|     Map()|[4F 62 6A 01 02 1...|
|            72| 10448|11/4/2018 5:00:01 PM|           Map()|     Map()|[4F 62 6A 01 02 1...|

получение содержимого поля Body и приведение его к строке:

msgRdd = capture_df.select(capture_df.Body.cast("string")).rdd.map(lambda x: x[0])

Вот как далеко я получил работающий код. Потратил много времени, пытаясь десериализовать реальное сообщение, но безуспешно. Буду признателен за любую помощь!

Некоторая дополнительная информация: Spark работает в кластере Microsoft Azure HDInsight 3.6. Версия Spark 2.2. Версия Python 2.7.12.

Ответы [ 3 ]

0 голосов
/ 07 декабря 2018

Полагаю, вы могли бы сделать что-то вроде:

jsonRdd = raw.select(raw.Body.cast("string"))
0 голосов
/ 19 апреля 2019

У меня была такая же проблема.

Версия Spark 2.4 решила проблему для меня.

Документацию вы найдете здесь: https://databricks.com/blog/2018/11/30/apache-avro-as-a-built-in-data-source-in-apache-spark-2-4.html

Примечание: вам нужно знать, как выглядит ваш AVRO-файл для создания вашей схемы (они просто загружают ее здесь).

Недостаток: в настоящее время он доступен только в Scala и Java. Насколько я знаю, это пока невозможно в Python.

0 голосов
/ 04 декабря 2018

То, что вы хотите сделать, это применить .decode('utf-8') к каждому элементу в столбце Body. Вы должны создать UDF из декодирования, чтобы вы могли применить его. UDF может быть создан с

from pyspark.sql import functions as f

decodeElements = f.udf(lambda a: a.decode('utf-8'))

Вот полный пример разбора avro-файлов, хранящихся в IoT-концентраторе, на пользовательскую конечную точку хранилища BLOB-объектов :

storage_account_name = "<YOUR STORACE ACCOUNT NAME>"
storage_account_access_key = "<YOUR STORAGE ACCOUNT KEY>"

# Read all files from one day. All PartitionIds are included. 
file_location = "wasbs://<CONTAINER>@"+storage_account_name+".blob.core.windows.net/<IoT Hub Name>/*/2018/11/30/*/*"
file_type = "avro"

# Read raw data
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

reader = spark.read.format(file_type).option("inferSchema", "true")
raw = reader.load(file_location)

# Decode Body into strings
from pyspark.sql import functions as f

decodeElements = f.udf(lambda a: a.decode('utf-8'))

jsons = raw.select(
    raw['EnqueuedTimeUtc'],
    raw['SystemProperties.connectionDeviceId'].alias('DeviceId'), 
    decodeElements(raw['Body']).alias("Json")
)

# Parse Json data
from pyspark.sql.functions import from_json

json_schema = spark.read.json(jsons.rdd.map(lambda row: row.Json)).schema
data = jsons.withColumn('Parsed', from_json('Json', json_schema)).drop('Json')

Disclamer: я новичок в Python и Databricks, и мое решение, вероятно, не идеально. Но я потратил больше дня, чтобы заставить это работать, и я надеюсь, что это может быть хорошей отправной точкой для кого-то.

...