Как передать набор входных файлов (не каталогов) для запуска работы и создать фрейм данных поверх этих файлов - PullRequest
0 голосов
/ 28 сентября 2019

Я хотел бы передать набор файлов avro в качестве входных данных в работу Spark и создать фрейм данных поверх этих файлов.(Я не хочу помещать файлы в каталог и передавать каталог в качестве входных данных).

В оболочке Spark я могу успешно создать фрейм данных, как показано ниже.

val DF = hiveContext.read.format("com.databricks.spark.avro").load("/data/year=2019/month=09/day=28/hour=01/data_1.1569650402704.avro","/data/year=2019/month=09/day=28/hour=01/data_2.1569650402353.avro")

Ното же самое не удается, когда я пытаюсь выполнить команду spark-submit.

Чтобы передать avro-файлы независимо в работу spark, я пытаюсь поместить avro-файлы в текстовый файл и передать этот файл в качестве входного аргумента.в класс Driver.

textFile:

/data/year=2019/month=09/day=28/hour=01/data_1.1569650402704.avro
/data/year=2019/month=09/day=28/hour=01/data_2.1569650402353.avro

spark-submit --class Spark_submit_test --master yarn Spark_submit_test.jar textFile 
val filename = args(0)
val files = Source.fromFile(filename).getLines
val fileList = files.mkString(",")
println("fileList : "+fileList)

=> Это печатает

fileList : /data/ASDS/PNR/archive/year=2019/month=09/day=28/hour=01/data_1.1569650402704.avro,/data/ASDS/PNR/archive/year=2019/month=09/day=28/hour=01/data_2.1569650402353.avro
val DF = hiveContext.read.format("com.databricks.spark.avro").load(fileList)

Получение ниже исключения:

Exception in thread "main" java.io.FileNotFoundException: File hdfs://bdaolc01-ns/data/ASDS/PNR/archive/year=2019/month=09/day=28/hour=01/data_1.1569650402704.avro,/data/ASDS/PNR/archive/year=2019/month=09/day=28/hour=01/data_2.1569650402353.avro does not exist.

Не уверен, как я могу избежать добавления "hdfs://bdaolc01-ns" в начале.Пожалуйста, исправьте меня, если я делаю неправильно, или предложите лучший подход для того же.

Примечание: я пытался заключить имена файлов в двойные кавычки, но безрезультатно.

Ожидаемый результат: Датафрейм должен быть успешно создан, а df.printSchema должна перечислять правильную схему файлов avro.

1 Ответ

0 голосов
/ 29 сентября 2019

Требуется оператор сплат!

myList: _ *

scala> val data = spark.read.parquet(paths: _*)
data: org.apache.spark.sql.DataFrame = [id: bigint, a: int ... 1 more field]    

scala> val paths = List("/tmp/example-parquet/part-00000-38cd8823-bff7-46f0-82a0-13d1d00ecce5-c000.snappy.parquet")
paths: List[String] = List(/tmp/example-parquet/part-00000-38cd8823-bff7-46f0-82a0-13d1d00ecce5-c000.snappy.parquet)

scala> val data = spark.read.parquet(paths: _*)
data: org.apache.spark.sql.DataFrame = [id: bigint, a: int ... 1 more field]

scala> data.count
res0: Long = 12500000
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...