сначала создайте набор данных с помощью команды spark-sql:
spark.sql("select id ,a.userid,regexp_replace(b.tradeno,',','|') as TradeNo
,Amount ,TradeType ,TxTypeId
,regexp_replace(title,',','|') as title
,status ,tradetime ,TradeStatus
,regexp_replace(otherside,',','') as otherside
from
(
select userid
from tableA
where daykey='2018-10-30'
group by userid
) a
left join tableb b
on a.userid=b.userid
where b.userid is not null")
результат:
dataset: org.apache.spark.sql.DataFrame = [id: bigint, userid: int ... 9 more fields]
затем экспортируйте набор данных как csv с помощью команды:
dataset.coalesce(40).write.option("delimiter", ",").option("charset", "utf-8").csv("/binlog_test/mycsv.excel")
при запуске искровой задачи возникает следующая ошибка:
трассировка стека драйверов:
org.apache.spark.scheduler.DAGScheduler.org $ апача $ искры $ планировщик $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala: 1430)
в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1418)
в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1417)
в scala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala: 59)
в scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala: 48)
в org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala: 1417)
в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 797)
в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 797)
в scala.Option.foreach (Option.scala: 257)
в org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler.scala: 797)
в org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive (DAGScheduler.scala: 1645)
в org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 1600)
в org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 1589)
в org.apache.spark.util.EventLoop $$ anon $ 1.run (EventLoop.scala: 48)
в org.apache.spark.scheduler.DAGScheduler.runJob (DAGScheduler.scala: 623)
в org.apache.spark.SparkContext.runJob (SparkContext.scala: 1930)
в org.apache.spark.SparkContext.runJob (SparkContext.scala: 1943)
в org.apache.spark.SparkContext.runJob (SparkContext.scala: 1963)
в org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply $ mcV $ sp (FileFormatWriter.scala: 127)
... еще 69
Вызвано: java.lang.IllegalArgumentException: поле "id" не существует.
в org.apache.spark.sql.types.StructType $$ anonfun $ fieldIndex $ 1.apply (StructType.scala: 290)
в org.apache.spark.sql.types.StructType $$ anonfun $ fieldIndex $ 1.apply (StructType.scala: 290)
в scala.collection.MapLike $ class.getOrElse (MapLike.scala: 128)
в scala.collection.AbstractMap.getOrElse (Map.scala: 59)
в org.apache.spark.sql.types.StructType.fieldIndex (StructType.scala: 289)
в org.apache.spark.sql.hive.orc.OrcRelation $$ anonfun $ 6.apply (OrcFileFormat.scala: 308)
в org.apache.spark.sql.hive.orc.OrcRelation $$ anonfun $ 6.apply (OrcFileFormat.scala: 308)
на scala.collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike.scala: 234)
на scala.collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike.scala: 234)
в scala.collection.Iterator $ class.foreach (Iterator.scala: 893)
в scala.collection.AbstractIterator.foreach (Iterator.scala: 1336)
в scala.collection.IterableLike $ class.foreach (IterableLike.scala: 72)
в org.apache.spark.sql.types.StructType.foreach (StructType.scala: 96)
в scala.collection.TraversableLike $ class.map (TraversableLike.scala: 234)
в org.apache.spark.sql.types.StructType.map (StructType.scala: 96)
в org.apache.spark.sql.hive.orc.OrcRelation $ .setRequiredColumns (OrcFileFormat.scala: 308)
в org.apache.spark.sql.hive.orc.OrcFileFormat $$ anonfun $ buildReader $ 2.apply (OrcFileFormat.scala: 140)
в org.apache.spark.sql.hive.orc.OrcFileFormat $$ anonfun $ buildReader $ 2.apply (OrcFileFormat.scala: 129)
в org.apache.spark.sql.execution.datasources.FileFormat $$ anon $ 1.apply (FileFormat.scala: 138)
в org.apache.spark.sql.execution.datasources.FileFormat $$ anon $ 1.apply (FileFormat.scala: 122)в org.apache.spark.sql.execution.datasources.FileScanRDD $$ anon $ 1.nextIterator (FileScanRDD.scala: 168)
в org.apache.spark.sql.execution.datasources.FileScanRDD $$ anon $ 1.hasNext (FileScanRDD.scala: 109)
в org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext (неизвестный источник)
в org.apache.spark.sql.execution.BufferedRowIterator.hasNext (BufferedRowIterator.java:43)
в org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.hasNext (WholeStageCodegenExec.scala: 377)
на scala.collection.Iterator $$ anon $ 11.hasNext (Iterator.scala: 408)
в org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write (BypassMergeSortShuffleWriter.java:126)
в org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 96)
в org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 53)
в org.apache.spark.scheduler.Task.run (Task.scala: 99)
в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 325)
в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142)
в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617)
at java.lang.Thread.run (Thread.java:745)
но, когда я непосредственно выполняю объединение, использую куст и создаю новую таблицу с результатом объединения, наконец экспортируем набор данных с помощью команды spark-sql, и все идет хорошо.