Как получить DataSet и выполнить к ним запросы SparkSQL? - PullRequest
0 голосов
/ 04 апреля 2019

Я хочу получить объект DataSet из JavaRDD и затем выполнить запросы SparkSQL для него. Я могу получить объект DataSet, но когда я запускаю на нем SQL, я получаю исключения.

Это для Apache Spark, использующего Java, и сейчас я выполняю спарк в автономном режиме в Windows 10. Позже я выполню этот код на реальном кластере.

SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("NumCount");
        sparkConf.set("spark.hadoop.validateOutputSpecs", "false");
        sparkConf.set("spark.driver.allowMultipleContexts", "true");

        SparkSession sparkSession = SparkSession.builder().appName("dump").master("local").getOrCreate();


        JavaSparkContext context = new JavaSparkContext(sparkConf);

        JavaRDD<String> lines = context.textFile("in/RUDSRBOJsonDump.dat");

        JavaRDD<String> variableKeylines = context.textFile("in/variablesToAnalyze.txt");
        List<String> varList = variableKeylines.collect();

        JavaRDD<String> cpWithColon = lines.map(line -> line.split("\\|")[3]);

        JavaRDD<String> checkPoints = cpWithColon.map(line -> line.split(":")[1].trim());

        JavaRDD<OutputParameters> counted = lines.flatMap(x -> parseJsonForMutipleTuples(decode(x.split("\\|")[4]),
                format2.format(format.parse(x.split("\\|")[0].split("\u0010")[7])),
                varList).iterator());

        Dataset<OutputParameters> dataset = sparkSession.createDataset(counted.collect(), Encoders.bean(OutputParameters.class));
        dataset.createOrReplaceTempView("ds_variable_value");

        String sql = "SELECT datestr, key, value, COUNT(*) FROM ds_variable_value GROUP BY datestr, key, value";
        Dataset<OutputParameters> result = sparkSession.sql(sql).as(Encoders.bean(OutputParameters.class));
        result.show();

Я получаю следующее исключение и не могу найти решение. Я пробовал использовать API-интерфейсы сравнения в Spark, но ничего не работает.

11:12:36.240 [Executor task launch worker for task 0] ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to [B
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
11:12:36.262 [dispatcher-event-loop-1] DEBUG org.apache.spark.scheduler.TaskSchedulerImpl - parentName: , name: TaskSet_0.0, runningTasks: 0
11:12:36.262 [dispatcher-event-loop-1] DEBUG org.apache.spark.scheduler.TaskSetManager - No tasks for locality level NO_PREF, so moving to locality level ANY
11:12:36.269 [task-result-getter-0] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to [B
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

11:12:36.273 [task-result-getter-0] ERROR org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 0.0 failed 1 times; aborting job
11:12:36.274 [task-result-getter-0] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 0.0, whose tasks have all completed, from pool 
11:12:36.281 [dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling stage 0
11:12:36.283 [dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ShuffleMapStage 0 (show at SparkService.java:171) failed in 0.091 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to [B
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
11:12:36.285 [dag-scheduler-event-loop] DEBUG org.apache.spark.scheduler.DAGScheduler - Removing stage 1 from waiting set.
11:12:36.285 [dag-scheduler-event-loop] DEBUG org.apache.spark.scheduler.DAGScheduler - After removal of stage 1, remaining stages = 1
11:12:36.285 [dag-scheduler-event-loop] DEBUG org.apache.spark.scheduler.DAGScheduler - After removal of stage 0, remaining stages = 0
11:12:36.286 [main] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 failed: show at SparkService.java:171, took 0.137233 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to [B
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
    at com.csdp.service.spark.SparkService.dumpFileAnalysis(SparkService.java:171)
    at com.csdp.service.spark.SparkService.main(SparkService.java:249)
Caused by: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to [B
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...