Схема подсчета Spark с ограничением во время чтения .csv - PullRequest
2 голосов
/ 29 апреля 2019

Я бы хотел вывести схему Spark.DataFrame из каталога CSV-файлов, используя небольшое подмножество строк (скажем, limit(100)).

Однако, установив inferSchema в Trueозначает, что Input Size / Records для FileScanRDD кажется всегда равным количеству строк во всех файлах CSV.

Есть ли способ сделать FileScan более избирательным, чтобы Spark просматривал меньшее количество строк при выводе схемы?

Примечание: для параметра samplingRatio для установлено значениеbe <1.0 не имеет желаемого поведения, хотя ясно, что inferSchema использует только выборочное подмножество строк. </p>

Ответы [ 2 ]

3 голосов
/ 02 мая 2019

Вы можете прочитать подмножество ваших входных данных в набор данных String. Метод CSV позволяет передать это в качестве параметра.

Вот простой пример (я оставлю вам чтение образца строк из входного файла):

val data = List("1,2,hello", "2,3,what's up?")
val csvRDD = sc.parallelize(data)
val df = spark.read.option("inferSchema","true").csv(csvRDD.toDS)
df.schema

При запуске в spark-shell выводится последняя строка из вышеперечисленных (я переформатировал ее для удобства чтения):

res4: org.apache.spark.sql.types.StructType = 
    StructType(
      StructField(_c0,IntegerType,true),
      StructField(_c1,IntegerType,true),
      StructField(_c2,StringType,true)
    )

Какая правильная схема для моего ограниченного набора входных данных.

2 голосов
/ 09 мая 2019

Предполагая, что вас интересует только схема, здесь возможен подход, основанный на сообщении cipri.l в этой ссылке

import org.apache.spark.sql.execution.datasources.csv.{CSVOptions, TextInputCSVDataSource}
def inferSchemaFromSample(sparkSession: SparkSession, fileLocation: String, sampleSize: Int, isFirstRowHeader: Boolean): StructType = {
  // Build a Dataset composed of the first sampleSize lines from the input files as plain text strings
  val dataSample: Array[String] = sparkSession.read.textFile(fileLocation).head(sampleSize)
  import sparkSession.implicits._
  val sampleDS: Dataset[String] = sparkSession.createDataset(dataSample)
  // Provide information about the CSV files' structure
  val firstLine = dataSample.head
  val extraOptions = Map("inferSchema" -> "true",   "header" -> isFirstRowHeader.toString)
  val csvOptions: CSVOptions = new CSVOptions(extraOptions, sparkSession.sessionState.conf.sessionLocalTimeZone)
  // Infer the CSV schema based on the sample data
  val schema = TextInputCSVDataSource.inferFromDataset(sparkSession, sampleDS, Some(firstLine), csvOptions)
  schema
}

В отличие от GMc В ответе выше, этот подход пытается напрямую вывести схему так же, как DataFrameReader.csv (), в фоновом режиме (но без усилий по созданию дополнительного набора данных с этой схемой,который мы затем будем использовать только для извлечения схемы из нее)

Схема выводится на основе набора данных [String], содержащего только первые sampleSize строки из входных файлов в виде строк простого текста.

При попытке извлечь образцы из данных Spark использует только 2 типа методов:

  1. Методы, которые извлекают определенный процент данных.Эта операция берет случайные выборки из всех разделов.Он выигрывает от более высокого параллелизма, но должен читать все входные файлы.
  2. Методы, которые получают определенное количество строк.Эта операция должна собирать данные о драйвере, но она может прочитать один раздел (если необходимое количество строк достаточно мало)

Поскольку вы упомянули, что вы хотите использовать определенное небольшое количество строк итак как вы хотите избежать касания всех данных, я предоставил решение, основанное на опции 2

PS: метод DataFrameReader.textFile принимает пути к файлам, папкам и также имеет вариант varargs, так что вы можете передатьодин или несколько файлов или папок.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...