Считывание данных avro с помощью блоков данных из озера данных Azure Gen1, созданных с помощью Azure EventHubs Capture не удается - PullRequest
1 голос
/ 01 декабря 2019

Я пытаюсь прочитать avro-данные из Azure Data Lake Gen1, сгенерированные из Azure EventHub с включенным захватом концентраторов событий Azure в Azure Databricks с pyspark:

inputdata = "evenhubscapturepath/*/*"
rawData = spark.read.format("avro").load(inputdata)

Сбой следующего оператора

rawData.count()

с

org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 48.0 failed 4 times, most recent failure: Lost task 162.3 in stage 48.0 (TID 2807, 10.3.2.4, executor 1): java.io.IOException: Not an Avro data file

Записывает ли EventHub-Capture данные не в формате Avro? Есть ли рекомендации по чтению захваченных в EventHub данных с помощью Spark?

Ответы [ 2 ]

2 голосов
/ 08 декабря 2019

Один шаблон, реализующий путь холодного проглатывания, использует Event Hubs Capture . При захвате событий EventHubs записывает по одному файлу на раздел, как определено в параметрах . Данные записаны в формате avro и могут быть проанализированы с помощью Apache Spark.

Итак, каковы лучшие практики использования этой функции?

1. Не преувеличивайте

Часто я видел людей, использующих конфигурацию по умолчанию, что в итоге часто приводит к множеству маленьких файлов. Если вы хотите использовать данные, загруженные через EventHubs Capture с Spark, имейте в виду рекомендации для размеров файлов в хранилище озера данных Azure и разделов с Spark. Размер файла должен составлять ~ 256 МБ, а разделов - от 10 до 50 ГБ. Итак, наконец, конфигурация зависит от количества и размеров сообщений, которые вы потребляете. В большинстве случаев у вас все в порядке, просто разбивая данные по дате загрузки.

2. Установите флажок «Не выбрасывать пустые файлы»

Вы должны установить флажок «Не выбрасывать пустые файлы». Если вы хотите использовать данные с Spark, который сохраняет ненужные файловые операции.

3. Используйте источник данных в своих файловых путях

С потоковой архитектурой ваш EventHub - это то, что Landing Zone будет в подходе пакетно-ориентированной архитектуры. Таким образом, вы будете получать данные в слое raw-data. Хорошей практикой является использование источников данных вместо имени EventHub в пути к каталогу. Так, например, если вы принимаете данные телеметрии от роботов на вашем заводе, это может быть путь к каталогу / raw / robots /

Для именования хранилища требуются все атрибуты, такие как {Namesapce}, {PartitionId} быть использованным. Итак, наконец, хорошее определение формата файла захвата с явно заданным путем, ежедневным разделением и использованием оставшихся атрибутов для имени файла в Azure Data Lake Gen 2 может выглядеть так:

 /raw/robots/ingest_date={Year}-{Month}-{Day}/{Hour}{Minute}{Second}-{Namespace}-{EventHub}-{PartitionId}

enter image description here

4. Подумайте о задании на сжатие

Захваченные данные не сжимаются и могут также оказаться в небольших файлах в вашем случае использования (минимальная частота записи составляет 15 минут). Поэтому, если необходимо, напишите задание на уплотнение, выполняемое один раз в день. Что-то вроде

df.repartition(5).write.format("avro").save(targetpath)

выполнит эту работу.

Итак, какие сейчас лучшие практики для чтения захваченных данных?

5. Игнорировать не avro-файлы для чтения данных

Azure EventHubs Capture записывает временные данные в Azure Data Lake Gen1. Рекомендуется только читать данные с расширением avro. Вы можете легко добиться этого с помощью конфигурации искры:

spark.conf.set("avro.mapred.ignore.inputs.without.extension", "true")

6. Только для чтения соответствующих разделов

Рекомендуется читать только соответствующие разделы, например, фильтровать текущий день приема.

7. Использовать общие метаданные

Чтение захваченных данных работает аналогично чтению данных непосредственно из Azure EventHubs. Таким образом, вы должны иметь схему. Предполагая, что у вас также есть задания по чтению данных непосредственно с помощью Spark Structured Streaming, хорошим шаблоном является сохранение метаданных и их совместное использование. Вы можете просто сохранить эти метаданные в json-файле Data Lake Store:

[{"MeasurementTS":"timestamp","Location":"string", "Temperature":"double"}]

и прочитать его с помощью этой простой функции синтаксического анализа :

# parse the metadata to get the schema
from collections import OrderedDict 
from pyspark.sql.types import *
import json

ds = dbutils.fs.head (metadata)                                                 # read metadata file

items = (json
  .JSONDecoder(object_pairs_hook=OrderedDict)
  .decode(ds)[0].items())

#Schema mapping 
mapping = {"string": StringType, "integer": IntegerType, "double" : DoubleType, "timestamp" : TimestampType, "boolean" : BooleanType}

schema = StructType([
    StructField(k, mapping.get(v.lower())(), True) for (k, v) in items])

Так что вы можетепросто повторно используйте вашу схему:

from pyspark.sql.functions import *

parsedData = spark.read.format("avro").load(rawpath). \
  selectExpr("EnqueuedTimeUtc", "cast(Body as string) as json") \
 .select("EnqueuedTimeUtc", from_json("json", schema=Schema).alias("data")) \
 .select("EnqueuedTimeUtc", "data.*")
1 голос
/ 02 декабря 2019

Убедитесь, что входные данные представляют собой файл " .avro ".

Поскольку модуль spark-avro является внешним, в DataFrameReader или DataFrameWriter отсутствует API-интерфейс .avro.

Для загрузки / сохранения данных в формате Avro необходимо указать формат параметра источника данных как avro (или org.apache.spark.sql.avro).

Пример:

Python
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")

ИЛИ

#storage->avro
avroDf = spark.read.format("com.databricks.spark.avro").load(in_path)

Подробнее см. По ссылкам ниже:

https://spark.apache.org/docs/latest/sql-data-sources-avro.html

http://blog.itaysk.com/2017/01/14/processing-event-hub-capture-files-using-spark

https://medium.com/@caiomsouza/processing-event-hubs-capture-files-avro-format-using-spark-azure-databricks-save-to-parquet-95259001d85f

Надеюсь, это поможет.

...