Как создать пустой фрейм данных в Spark - PullRequest
0 голосов
/ 30 мая 2018

У меня есть набор таблиц кустов на основе Avro, и мне нужно прочитать данные из них.Поскольку Spark-SQL использует ульи для чтения данных из HDFS, это намного медленнее, чем чтение HDFS напрямую.Поэтому я использовал блоки данных Spark-Avro jar для чтения файлов Avro из базовой директории HDFS.

Все отлично работает, кроме случаев, когда таблица пуста.Мне удалось получить схему из файла .avsc таблицы кустов с помощью следующей команды, но я получаю сообщение об ошибке " Не найдено файлов Avro "

val schemaFile = FileSystem.get(sc.hadoopConfiguration).open(new Path("hdfs://myfile.avsc"));

val schema = new Schema.Parser().parse(schemaFile);

spark.read.format("com.databricks.spark.avro").option("avroSchema", schema.toString).load("/tmp/myoutput.avro").show()

Обходные пути:

Я поместил пустой файл в этот каталог, и то же самое работает отлично.

Есть ли другие способы добиться того же?как настройка конф или что-то?

Ответы [ 4 ]

0 голосов
/ 08 мая 2019

Вам не нужно использовать emptyRDD.Вот что у меня работает с PySpark 2.4:

empty_df = spark.createDataFrame([], schema) # spark is the Spark Session

Если у вас уже есть схема из другого фрейма данных, вы можете просто сделать это:

schema = some_other_df.schema

Если нет,затем вручную создайте схему пустого фрейма данных, например:

schema = StructType([StructField("col_1", StringType(), True),
                     StructField("col_2", DateType(), True),
                     StructField("col_3", StringType(), True),
                     StructField("col_4", IntegerType(), False)]
                     )

Надеюсь, это поможет.

0 голосов
/ 31 мая 2018

В зависимости от версии Spark, вы можете использовать способ отражения. В SchemaConverters есть закрытый метод, который выполняет преобразование схемы в StructType .. (не уверен, почему она является закрытой).если честно, было бы очень полезно в других ситуациях).Используя отражение в скале, вы сможете сделать это следующим образом

import scala.reflect.runtime.{universe => ru}
import org.apache.avro.Schema
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

var schemaStr = "{\n \"type\": \"record\",\n \"namespace\": \"com.example\",\n \"name\": \"FullName\",\n \"fields\": [\n { \"name\": \"first\", \"type\": \"string\" },\n      { \"name\": \"last\", \"type\": \"string\" }\n  ]\n }"
val schema = new Schema.Parser().parse(schemaStr);

val m = ru.runtimeMirror(getClass.getClassLoader)
val module = m.staticModule("com.databricks.spark.avro.SchemaConverters")
val im = m.reflectModule(module)
val method = im.symbol.info.decl(ru.TermName("toSqlType")).asMethod

val objMirror = m.reflect(im.instance)
val structure = objMirror.reflectMethod(method)(schema).asInstanceOf[com.databricks.spark.avro.SchemaConverters.SchemaType]
val sqlSchema = structure.dataType.asInstanceOf[StructType]
val empty = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], sqlSchema)

empty.printSchema
0 голосов
/ 11 ноября 2018

Аналогично ответу EmiCareOfCell44, чуть более элегантно и более «пусто»

val emptySchema = StructType(Seq())
val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],
                emptySchema)
0 голосов
/ 30 мая 2018

Чтобы создать пустой DataFrame:

val my_schema = StructType(Seq(
    StructField("field1", StringType, nullable = false),
    StructField("field2", StringType, nullable = false)
  ))

val empty: DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], my_schema)

Может быть, это может помочь

...