spark.time () не работает для запроса данных - PullRequest
0 голосов
/ 22 декабря 2018

Я запускаю этот код в spark-shell, spark 2.3.0:

val lineitem=spark.read.parquet("hdfs://namenode:8020/lineitem.parquet")
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val increase = udf { (x: Double, y: Double) => x * (1 + y) }
val q1=lineitem.filter($"l_shipdate" <= "1998-09-02")
  .groupBy($"l_returnflag", $"l_linestatus")
  .agg(sum($"l_quantity"), sum($"l_extendedprice"),
        sum(decrease($"l_extendedprice", $"l_discount")),
        sum(increase(decrease($"l_extendedprice", $"l_discount"), $"l_tax")),
        avg($"l_quantity"), avg($"l_extendedprice"), avg($"l_discount"), count($"l_quantity"))
.sort($"l_returnflag", $"l_linestatus")

и до сих пор все работает нормально.но когда я хочу измерить время выполнения запроса, используя spark.time(q1.show()), я получаю:

    2018-12-22 17:49:56 ERROR Executor:91 - Exception in task 0.0 in stage 9.0 (TID                                                                                         77)
java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.uncompressedLengt                                                                                        h(Ljava/nio/ByteBuffer;II)I
        at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
        at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:561)
        at org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyD                                                                                        ecompressor.java:62)
        at org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(Non                                                                                        BlockedDecompressorStream.java:51)
        at java.io.DataInputStream.readFully(DataInputStream.java:195)
        at java.io.DataInputStream.readFully(DataInputStream.java:169)
        at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(Byte                                                                                        sInput.java:205)
        at org.apache.parquet.column.values.dictionary.PlainValuesDictionary$Pla                                                                                        inDoubleDictionary.<init>(PlainValuesDictionary.java:194)
        at org.apache.parquet.column.Encoding$1.initDictionary(Encoding.java:98)
        at org.apache.parquet.column.Encoding$4.initDictionary(Encoding.java:149                                                                                        )
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnRe                                                                                        ader.<init>(VectorizedColumnReader.java:114)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetR                                                                                        ecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:312)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetR                                                                                        ecordReader.nextBatch(VectorizedParquetRecordReader.java:258)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetR                                                                                        ecordReader.nextKeyValue(VectorizedParquetRecordReader.java:161)
        at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNe                                                                                        xt(RecordReaderIterator.scala:39)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNex                                                                                        t(FileScanRDD.scala:106)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIt                                                                                        erator(FileScanRDD.scala:182)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNex                                                                                        t(FileScanRDD.scala:106)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIte                                                                                        ratorForCodegenStage1.scan_nextBatch$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIte                                                                                        ratorForCodegenStage1.agg_doAggregateWithKeys$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIte                                                                                        ratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRo                                                                                        wIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$ano                                                                                        n$1.hasNext(WholeStageCodegenExec.scala:614)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(Bypa                                                                                        ssMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scal                                                                                        a:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scal                                                                                        a:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.                                                                                        java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor                                                                                        .java:617)
        at java.lang.Thread.run(Thread.java:748)
2018-12-22 17:49:56 ERROR Executor:91 - Exception in task 2.0 in stage 9.0 (TID                                                                                         79)
java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.uncompressedLengt                                                                                        h(Ljava/nio/ByteBuffer;II)I
        at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
        at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:561)
        at org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyD                                                                                        ecompressor.java:62)
        at org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(Non                                                                                        BlockedDecompressorStream.java:51)
        at java.io.DataInputStream.readFully(DataInputStream.java:195)
        at java.io.DataInputStream.readFully(DataInputStream.java:169)
        at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(Byte                                                                                        sInput.java:205)

Есть идеи, что не так и как это решить?

1 Ответ

0 голосов
/ 22 декабря 2018

Я считаю, что проблема не связана с spark.time.Проблема вызвана невозможностью чтения сжатых файлов Snappy.Первый блок кода, который вы разместили, - это просто преобразование, то есть Spark на самом деле не пытается его выполнить.Помните, Spark использует ленивую оценку.

Только после того, как вы вызовете q1.show(), он фактически попытается выполнить запрос, который вызывает ошибку при чтении файлов Snappy.

Что вам действительно нужно для устранения неполадок, так это lang.UnsatisfiedLinkError,Недавно была исправлена ​​проблема, связанная с конфликтом в версиях Snappy, используемых Spark 2.3.0 и Hadoop 2.8.3:

Похоже, что обновление до Spark 2.3.2 устраняет проблему:

https://issues.apache.org/jira/browse/SPARK-24018?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...