Я пытаюсь преобразовать набор данных искры в итератор, чтобы записать набор данных в influenxdb.После того, как я сконструировал набор данных, который мне нужен, мне нужно преобразовать набор данных в итератор для передачи в модуль записи effxdb.
Однако проблема возникает при использовании встроенной функции toLocalIterator () для класса набора данных.
Я получил следующее исключение:
override def gatherTimeMetrics(df: DataFrame)
(implicit params: ConversionParams, config: Config): Dataset[TimeMetric] = {
df
.select($ "download_date", $ "unixtime".cast("long") as "unixtime")
.groupBy("download_date", "unixtime")
.agg(count("*") as "rows")
.repartition(1)
.as[(String, Long, Long)]
.map {
case (downloadDate, unixtime, rows) =>
TimeMetric(
unixtime,
Map(
"rows" - > rows
),
Map(
"download_date" - > downloadDate
)
)
}
}
Здесь используется возвращенный набор данных:
def run(df: DataFrame) (implicit params: T, config: Config): Unit =
metricsService.write(Metrics(getMeasurementName, gatherTimeMetrics(df).toLocalIterator(), getCommonTags))
Я ожидаю, что сборка toLocalIterator () преобразуется в Iterator, но
Я получил это исключение:
Exception in thread "main" java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReaderWithPartitionValues(ParquetFileFormat.scala:301)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:285)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:283)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:303)
at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:42)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:386)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:88)
at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:124)
at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:115)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:252)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)
at org.apache.spark.sql.execution.DeserializeToObjectExec.inputRDDs(objects.scala:79)
at org.apache.spark.sql.execution.MapElementsExec.inputRDDs(objects.scala:215)
at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:116)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:386)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
at org.apache.spark.sql.execution.SparkPlan.executeToIterator(SparkPlan.scala:290)
at org.apache.spark.sql.Dataset$$anonfun$toLocalIterator$1.apply(Dataset.scala:2421)
at org.apache.spark.sql.Dataset$$anonfun$toLocalIterator$1.apply(Dataset.scala:2416)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
at org.apache.spark.sql.Dataset.toLocalIterator(Dataset.scala:2416)