Загрузка avro файлов в фреймы данных pyspark из hdfs - PullRequest
0 голосов
/ 03 сентября 2018

Я построил небольшой конвейер данных, который перемещает некоторые вымышленные тестовые данные из локального каталога (в формате json) в hdfs (формат avro). Похоже, что это сработало правильно (никаких ошибок не показывало flume), но может быть так, что ошибка лежит уже здесь. Следующим шагом было преобразование файла avro в некоторый фрейм данных pyspark с помощью загрузчика databricks (для этого я смог найти только библиотеку python). Теперь позвольте мне объяснить, как я это сделал, чтобы вы могли увидеть, где я мог потерпеть неудачу:

1) Создание файлов avro из файла json с помощью flume

Моя цель - перенести данные json из локального каталога в HDFS, чтобы я мог проанализировать их с помощью pySpark. Для этого я использую Flume. Поскольку у json плохая компрессия в HDFS, я также конвертирую каждый файл в avro, используя следующий файл flume.conf:

agent.sources.tail.type = exec
agent.sources.tail.shell = /bin/bash -c
agent.sources.tail.command = cat /home/user/Data/json/*
agent.sources.tail.batchsize = 10
agent.sources.tail.channels = MemChannel

agent.channels.MemChannel.type = memory
agent.channels.MemChannel.capacity = 100
agent.channels.MemChannel.transactionCapacity = 100


agent.sinks.HDFS.channel = MemChannel
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.fileSuffix=.avro
agent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/home/user/Data/hdfs/test_data
agent.sinks.HDFS.hdfs.batchSize = 100
agent.sinks.HDFS.hdfs.rollSize = 0
agent.sinks.HDFS.hdfs.rollCount = 100
agent.sinks.HDFS.serializer=avro_event
agent.sinks.HDFS.serializer.compressionCodec=snappy

Это запустилось без ошибок, поэтому я предполагаю, что flume переместил каждый файл как правильный файл avro в HDFS.

2) Создание кадра данных путем загрузки файла avro

Теперь наступает момент, когда я пытаюсь прочитать один файл avro в виде фрейма данных в pyspark:

from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.avro.compression.codec","snappy") 

# creates a dataframe by reading a single avro file 
df = sqlContext.read.format("com.databricks.spark.avro").load("hdfs://localhost:9000/home/user/Data/hdfs/test_data/FlumeData.1535723039267.avro")

Это показывает мне следующий (неправильный) вывод:

df.show()
+-------+--------------------+
|headers|                body|
+-------+--------------------+
|     []|[7B 22 63 61 74 6...|
|     []|[7B 22 63 61 74 6...|
|     []|[7B 22 63 61 74 6...|
|     []|[7B 22 63 61 74 6...|
|     []|[7B 22 63 61 74 6...|
+-------+--------------------+
only showing top 5 rows

Это явно не то, что я хочу, так как весь приведенный выше код, похоже, читает файл avro как обычный текстовый файл, и, следовательно, нет проанализированной структуры. Раньше я просто создавал фрейм данных, который использовал те же данные, но хранились в исходном файле json.

# creates a dataframe by reading a single json file
df = sqlContext.read.json('hdfs://localhost:9000/home/user/Data/hdfs/test_data/FlumeData.1535702513118.json')

Вот так должен выглядеть желаемый (правильный) вывод:

df.show()
+---------------+--------------------+---+-------------------+-----------------+
|       category|             content| id|          timestamp|             user|
+---------------+--------------------+---+-------------------+-----------------+
|              A|Million near orde...|801|2018-08-30_16:49:53|      Molly Davis|
|              D|Determine company...|802|2018-08-30_16:49:53|       Ronnie Liu|
|              B|Among themselves ...|803|2018-08-30_16:49:53|       Lori Brown|
|              C|Through various d...|804|2018-08-30_16:49:53|   Judith Herrera|
|              C|Week toward so co...|805|2018-08-30_16:49:53|Teresa Cunningham|
+---------------+--------------------+---+-------------------+-----------------+
only showing top 5 rows

Как мне добиться того же результата для моего конвертированного файла avro?

...