Я построил небольшой конвейер данных, который перемещает некоторые вымышленные тестовые данные из локального каталога (в формате json) в hdfs (формат avro). Похоже, что это сработало правильно (никаких ошибок не показывало flume), но может быть так, что ошибка лежит уже здесь. Следующим шагом было преобразование файла avro в некоторый фрейм данных pyspark с помощью загрузчика databricks (для этого я смог найти только библиотеку python). Теперь позвольте мне объяснить, как я это сделал, чтобы вы могли увидеть, где я мог потерпеть неудачу:
1) Создание файлов avro из файла json с помощью flume
Моя цель - перенести данные json из локального каталога в HDFS, чтобы я мог проанализировать их с помощью pySpark. Для этого я использую Flume. Поскольку у json плохая компрессия в HDFS, я также конвертирую каждый файл в avro, используя следующий файл flume.conf:
agent.sources.tail.type = exec
agent.sources.tail.shell = /bin/bash -c
agent.sources.tail.command = cat /home/user/Data/json/*
agent.sources.tail.batchsize = 10
agent.sources.tail.channels = MemChannel
agent.channels.MemChannel.type = memory
agent.channels.MemChannel.capacity = 100
agent.channels.MemChannel.transactionCapacity = 100
agent.sinks.HDFS.channel = MemChannel
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.fileSuffix=.avro
agent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/home/user/Data/hdfs/test_data
agent.sinks.HDFS.hdfs.batchSize = 100
agent.sinks.HDFS.hdfs.rollSize = 0
agent.sinks.HDFS.hdfs.rollCount = 100
agent.sinks.HDFS.serializer=avro_event
agent.sinks.HDFS.serializer.compressionCodec=snappy
Это запустилось без ошибок, поэтому я предполагаю, что flume переместил каждый файл как правильный файл avro в HDFS.
2) Создание кадра данных путем загрузки файла avro
Теперь наступает момент, когда я пытаюсь прочитать один файл avro в виде фрейма данных в pyspark:
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.avro.compression.codec","snappy")
# creates a dataframe by reading a single avro file
df = sqlContext.read.format("com.databricks.spark.avro").load("hdfs://localhost:9000/home/user/Data/hdfs/test_data/FlumeData.1535723039267.avro")
Это показывает мне следующий (неправильный) вывод:
df.show()
+-------+--------------------+
|headers| body|
+-------+--------------------+
| []|[7B 22 63 61 74 6...|
| []|[7B 22 63 61 74 6...|
| []|[7B 22 63 61 74 6...|
| []|[7B 22 63 61 74 6...|
| []|[7B 22 63 61 74 6...|
+-------+--------------------+
only showing top 5 rows
Это явно не то, что я хочу, так как весь приведенный выше код, похоже, читает файл avro как обычный текстовый файл, и, следовательно, нет проанализированной структуры. Раньше я просто создавал фрейм данных, который использовал те же данные, но хранились в исходном файле json.
# creates a dataframe by reading a single json file
df = sqlContext.read.json('hdfs://localhost:9000/home/user/Data/hdfs/test_data/FlumeData.1535702513118.json')
Вот так должен выглядеть желаемый (правильный) вывод:
df.show()
+---------------+--------------------+---+-------------------+-----------------+
| category| content| id| timestamp| user|
+---------------+--------------------+---+-------------------+-----------------+
| A|Million near orde...|801|2018-08-30_16:49:53| Molly Davis|
| D|Determine company...|802|2018-08-30_16:49:53| Ronnie Liu|
| B|Among themselves ...|803|2018-08-30_16:49:53| Lori Brown|
| C|Through various d...|804|2018-08-30_16:49:53| Judith Herrera|
| C|Week toward so co...|805|2018-08-30_16:49:53|Teresa Cunningham|
+---------------+--------------------+---+-------------------+-----------------+
only showing top 5 rows
Как мне добиться того же результата для моего конвертированного файла avro?