Сохранение пустого кадра данных в azure blov с расширением .csv - PullRequest
0 голосов
/ 07 апреля 2020

Привет, все, что я застрял в одном месте. У меня есть фрейм данных, например:

col1,col2
a,0
c,0
B,0
b,0

Я фильтрую его, и в некоторых ситуациях я могу получить пустой результирующий фрейм данных, например:

 val tempDf:DataFrame= df.filter(expr("col2=1"))

, что приводит к:

+---------+---------+
|Col1     |Col2     |
+---------+---------+
+---------+---------+

, когда я сохраняю его в azure blob

df.coalesce(1).write.option("header", "true").mode("overwrite").format("csv").save(location)

, это дает мне ошибку

Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s)

Я знаю, что невозможно сохранить пустой фрейм данных в azure blob напрямую. Но должен быть какой-то способ сделать это.
Я прошел через какой-то вопрос вроде , но я не получаю, но для обработки использовались локальные файловые системы.
Есть ли способ сохранить фрейм данных, когда есть пустые строки и что я думал, чтобы вставить одну строку с нулевыми значениями для каждого столбца, но это помешает моей следующей операции, потому что этот вывод будет использоваться для следующей обработки.
Я использую версию для свечи 2,4
Любая помощь будет оценена

1 Ответ

1 голос
/ 07 апреля 2020

это известная ошибка в spark SPARK-26208 , исправленная в версии 3.0.0. Если вы не можете перейти на spark 3.0, вы можете написать свой собственный класс с расширением CSVFileFormat:

package path.to.your.package

import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.types.StructType

class CustomCSVFileFormat extends CSVFileFormat with Serializable {

  override def shortName(): String = CustomCSVFileFormat.EMPTY_CSV_WRITER_NAME

  override def prepareWrite(
                             sparkSession: SparkSession,
                             job: org.apache.hadoop.mapreduce.Job,
                             options: Map[String, String],
                             dataSchema: StructType): OutputWriterFactory = {
    val writerFactory = super.prepareWrite(sparkSession, job, options, dataSchema)
    new OutputWriterFactory {
      override def newInstance(
                                path: String,
                                dataSchema: StructType,
                                context: TaskAttemptContext): OutputWriter = {
        /* This custom writer is wrapper for common CsvOutputWriter.
         Custom writer checks if options contains:  header -> true, then writing empty row for print headers to file
         If headers printed common writer skip empty rows always and this doesn't affect to further writing */
        val outputWriter = writerFactory.newInstance(path, dataSchema, context)
        if (options.get("header").exists(_.toBoolean)) {
          outputWriter.write(InternalRow.empty)
        }
        outputWriter
      }

      override def getFileExtension(context: TaskAttemptContext): String = {
        writerFactory.getFileExtension(context)
      }
    }
  }
}

object CustomCSVFileFormat {
  val CUSTOM_CSV_WRITER_NAME = "customCSV"
}

и добавить файл

src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

с содержанием:

path.to.your.package.CustomCSVFileFormat

и используйте свой произвольный формат по короткому имени:

df.coalesce(1)
  .write
  .option("header", "true")
  .mode("overwrite")
  .format(CustomCSVFileFormat.CUSTOM_CSV_WRITER_NAME)
  .save(location)
...