Почему в формате df write console ничего не отображается? - PullRequest
0 голосов
/ 26 апреля 2020

У меня есть состояние c данных, как записать его в консоль вместо использования df.show()

val sparkConfig = new SparkConf().setAppName("streaming-vertica").setMaster("local[2]")
val sparkSession = SparkSession.builder().master("local[2]").config(sparkConfig).getOrCreate()
val sc = sparkSession.sparkContext

val rows = sc.parallelize(Array(
  Row(1,"hello", true),
  Row(2,"goodbye", false)
))

val schema = StructType(Array(
  StructField("id",IntegerType, false),
  StructField("sings",StringType,true),
  StructField("still_here",BooleanType,true)
))

val df = sparkSession.createDataFrame(rows, schema) 

df.write
  .format("console")
  .mode("append")

Это ничего не записывает в консоль:

 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/04/27 00:30:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Process finished with exit code 0

При использовании save:

   df.write
      .format("console")
      .mode("append")
      .save()

Это дает:

Использование стандартного профиля Spark log4j: org / apache / spark / log4j-defaults.properties 20/04/27 00:45:01 WARN NativeCodeLoader: Невозможно загрузить нативную библиотеку oop для вашей платформы ... с использованием встроенных java классов, где применимо Исключение в потоке "main" java .lang.RuntimeException: org. apache .spark. sql .execution.streaming.ConsoleSinkProvider не позволяет создавать таблицы как выбранные. в scala .sys.package $ .error (package. scala: 27) в org. apache .spark. sql .execution.datasources.DataSource.write (DataSource. scala: 473) в org. apache .spark. sql .execution.datasources.SaveIntoDataSourceCommand.run (SaveIntoDataSourceCommand. scala: 50) в org. apache .spark. sql .execution.command.ExecutedCommandExe c. sideEffectResult $ lzycompute (команды. scala: 58) в орг. apache .spark. sql .execution.command.ExecutedCommandExe c .sideEffectResult (команды. scala: 56) в орг. apache .spark. sql .execution.command.ExecutedCommandExe c .doExecute (команды. scala: 74) в org. apache .spark. sql .execution.SparkPlan $$ anonfun $ execute $ 1.apply (SparkPlan. scala: 117) в орг. apache .spark. sql .execution.SparkPlan $$ anonfun $ execute $ 1.apply (SparkPlan. scala: 117) в орг. apache .spark . sql .execution.SparkPlan $$ anonfun $ executeQuery $ 1.apply (SparkPlan. scala: 138) в org. apache .spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope. scala: 151) в орг. apache .spark. sql .execution.SparkPlan.execu teQuery (SparkPlan. scala: 135) в орг. apache .spark. sql .execution.SparkPlan.execute (SparkPlan. scala: 116) в орг. apache .spark. sql. execute.QueryExecution.toRdd $ lzycompute (QueryExecution. scala: 92) в орг. apache .spark. sql .execution.QueryExecution.toRdd (QueryExecution. scala: 92) в орг. apache. spark. sql .DataFrameWriter.runCommand (DataFrameWriter. scala: 609) в org. apache .spark. sql .DataFrameWriter.save (DataFrameWriter. scala: 233) в rep.StaticDFWrite $ .main (StaticDFWrite. scala: 35) в rep.StaticDFWrite.main (StaticDFWrite. scala)

Версия Spark = 2.2.1
scala версия = 2.11.12

1 Ответ

0 голосов
/ 26 апреля 2020

Вы должны вызвать save для объекта DataFrameWriter.

без метода сохранения, он просто создаст объект DataFrameWriter и завершит ваш сеанс.

Проверьте код ниже, я проверил в spark- shell.

Обратите внимание, что этот код работает на версии 2.4.0 spark, но не работает на 2.2.0

Формат консоли не будет работать с записью в версии 2.2.0 - https://issues.apache.org/jira/browse/SPARK-20599

scala> df.write.format("console").mode("append")
res5: org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row] = org.apache.spark.sql.DataFrameWriter@148a3112

scala> df.write.format("console").mode("append").save()
+--------+---+
|    name|age|
+--------+---+
|srinivas| 20|
+--------+---+


...