Spark Чтение нескольких путей с автоматическим обнаружением разделов - PullRequest
0 голосов
/ 03 декабря 2018

Я пытаюсь прочитать некоторые файлы avro в DataFrame из нескольких путей.Допустим, мой путь - "s3a://bucket_name/path/to/file/year=18/month=11/day=01". По этому пути у меня есть еще два раздела, скажем, country=XX/region=XX

Я хочу прочитать несколько дат одновременно, без явного названия разделов страны и региона.Кроме того, я хочу, чтобы страна и регион были столбцами в этом фрейме данных.

sqlContext.read.format("com.databricks.spark.avro").load("s3a://bucket_name/path/to/file/year=18/month=11/day=01")

Эта строка прекрасно работает, поскольку я читаю только один путь.Он обнаруживает разделы страны и региона и выводит их схему.

Когда я пытаюсь прочитать несколько дат, скажем,

val paths = Seq("s3a://bucket_name/path/to/file/year=18/month=11/day=01", "s3a://bucket_name/path/to/file/year=18/month=11/day=02")

sqlContext.read.format("com.databricks.spark.avro").load(paths:_*)

Я получаю эту ошибку:

    18/12/03 03:13:53 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result insub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
18/12/03 03:13:53 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Suspicious paths:?
 s3a://bucket_name/path/to/file/year=18/month=11/day=02
s3a://bucket_name/path/to/file/year=18/month=11/day=01

If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.
        at scala.Predef$.assert(Predef.scala:179)
        at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:106)
        at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$discoverPartitions(interfaces.scala:621)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionSpec$3.apply(interfaces.scala:526)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionSpec$3.apply(interfaces.scala:525)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.sources.HadoopFsRelation.partitionSpec(interfaces.scala:524)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionColumns$1.apply(interfaces.scala:578)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionColumns$1.apply(interfaces.scala:578)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.sources.HadoopFsRelation.partitionColumns(interfaces.scala:578)
        at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:637)
        at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
        at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:39)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:136)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
        at $iwC$$iwC$$iwC.<init>(<console>:38)
        at $iwC$$iwC.<init>(<console>:40)
        at $iwC.<init>(<console>:42)
        at <init>(<console>:44)
        at .<init>(<console>:48)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064)
        at org.apache.spark.repl.Main$.main(Main.scala:35)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

ObvioslyЯ не могу использовать basePath, потому что пути не разделяют один.Я также пытаюсь использовать / * в конце каждого пути, это на самом деле работает, но полностью игнорирует разделы страны и региона.

Я могу читать путь один за другим и объединять его, но я чувствую, что что-то упустил.

Есть идеи, почему он работает только для одного пути и как заставить его работать для нескольких путей?

1 Ответ

0 голосов
/ 03 декабря 2018

Действительно, хотелось бы, чтобы все сообщения об ошибках были такими же четкими - If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.

Относительный ли путь year=18/month=11/day=01 связан с разбиением, или вы просто использовали такое же соглашение?

Еслипервый верен, тогда вы должны просто прочитать s3a://bucket_name/path/to/file/ и использовать предикаты для фильтрации желаемых дат.Или, может быть, как подсказывает ошибка, вы можете попробовать sqlContext.read.option("basePath","s3a://bucket_name/path/to/file/").format("com.databricks.spark.avro").load(paths:_*), где пути относительны

Если последний имеет значение true, тогда вам следует запрашивать каждый из них отдельно и применять unionAll к кадрам данных (как следует из сообщения об ошибке),Возможно, обработка года / месяца / дня в качестве столбцов раздела также будет работать в этом случае, даже если вы не использовали partitionBy при записи данных ...

...