Невозможно преобразовать набор данных Spark в Iterator с помощью встроенного в toLocalIterator () - PullRequest
1 голос
/ 29 мая 2019

Я пытаюсь преобразовать набор данных искры в итератор, чтобы записать набор данных в influenxdb.После того, как я сконструировал набор данных, который мне нужен, мне нужно преобразовать набор данных в итератор для передачи в модуль записи effxdb.

Однако проблема возникает при использовании встроенной функции toLocalIterator () для класса набора данных.

Я получил следующее исключение:

override def gatherTimeMetrics(df: DataFrame)
    (implicit params: ConversionParams, config: Config): Dataset[TimeMetric] = {
        df
            .select($ "download_date", $ "unixtime".cast("long") as "unixtime")
            .groupBy("download_date", "unixtime")
            .agg(count("*") as "rows")
            .repartition(1)
            .as[(String, Long, Long)]
            .map {
                case (downloadDate, unixtime, rows) =>
                TimeMetric(
                    unixtime,
                    Map(
                        "rows" - > rows
                    ),
                    Map(
                        "download_date" - > downloadDate
                    )
                )
            }
    }

Здесь используется возвращенный набор данных:

def run(df: DataFrame) (implicit params: T, config: Config): Unit =
    metricsService.write(Metrics(getMeasurementName, gatherTimeMetrics(df).toLocalIterator(), getCommonTags))

Я ожидаю, что сборка toLocalIterator () преобразуется в Iterator, но

Я получил это исключение:

Exception in thread "main" java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReaderWithPartitionValues(ParquetFileFormat.scala:301)
    at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:285)
    at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:283)
    at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:303)
    at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:42)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:386)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:88)
    at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:124)
    at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:115)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:252)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)
    at org.apache.spark.sql.execution.DeserializeToObjectExec.inputRDDs(objects.scala:79)
    at org.apache.spark.sql.execution.MapElementsExec.inputRDDs(objects.scala:215)
    at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:116)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:386)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
    at org.apache.spark.sql.execution.SparkPlan.executeToIterator(SparkPlan.scala:290)
    at org.apache.spark.sql.Dataset$$anonfun$toLocalIterator$1.apply(Dataset.scala:2421)
    at org.apache.spark.sql.Dataset$$anonfun$toLocalIterator$1.apply(Dataset.scala:2416)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
    at org.apache.spark.sql.Dataset.toLocalIterator(Dataset.scala:2416)

1 Ответ

1 голос
/ 29 мая 2019

Это зависимость, которая имеет класс org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.6.1</version>
</dependency>

для отладки / печати всех jar-файлов в classpath ниже, будет полезно понять, какие jar-файлы в вашем classpath.

def urlsinclasspath(cl: ClassLoader): Array[java.net.URL] = cl match {
    case null => Array()
    case u: java.net.URLClassLoader => u.getURLs() ++ urlsinclasspath(cl.getParent)
    case _ => urlsinclasspath(cl.getParent)
  }

Звонящий будет ...

val  urls = urlsinclasspath(getClass.getClassLoader).foreach(println)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...