Наборы данных Spark для преобразования файлов паркета - PullRequest
0 голосов
/ 05 июня 2018

Я новичок в программировании на spark / scala.Я пытаюсь сохранить наборы данных spark в файл партера, но создается только каталог партера без какого-либо подкаталога или файлов внутри него.

фрагмент кода: объект Dimension1 {класс дела DIM_DATE_PR (ключ_даты: Int, date_fld: метка времени, имя_имя: String, date_key_ly: Int, date_fld_ly: метка времени, day_in_year: Int, day_in_month: Int, day_in_week: тип: int, day_in_week: типСтрока, year_of_day: Int, имя_года: Int, half_in_year: Int, half_year_name: String, quar_in_year: Int, имя_ квартала: String, month_in_quarter: Int, month_in_year: Int, имя_месяца: String, week_in_year: строка, имя_годной недели: строка: название неделиweek_name_in_month: String, week_st_date_key: String, week_end_date_key: Int, month_st_date_key: Int, month_end_date_key: Int, week_number_this_year: Int, год: Int, квартал: Int, ca_date_key: Int, day_type_seq: Int, year_st_date_key: Int, date_key_ly_day2day: Int, date_key_ct_ly:Int)

def mapper (строка: строка): DIM_DATE_PR = {

val fields = line.split("\\|", -1)
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val DIM_DATE_PRL1: DIM_DATE_PR = DIM_DATE_PR(
  fields(0).toInt,
  new Timestamp(formatter.parse(fields(1)).getTime),
  fields(2),
  if (fields(3).isEmpty()) { 0 } else { fields(3).toInt },
  if (fields(4).isEmpty()) { new Timestamp(formatter.parse("2012-04-01 00:00:00").getTime) } else { new Timestamp(formatter.parse(fields(4)).getTime) },
  fields(5).toInt,
  fields(6).toInt,
  fields(7).toInt,
  fields(8),
  fields(9).toInt,
  fields(10).toInt,
  fields(11).toInt,
  fields(12),
  fields(13).toInt,
  fields(14),
  fields(15).toInt,
  fields(15).toInt,
  fields(16),
  fields(17),
  fields(18),
  fields(19),
  fields(20),
  fields(21),
  fields(22).toInt,
  fields(23).toInt,
  fields(25).toInt,
  if (fields(26).isEmpty()) { 0 } else { fields(26).toInt },
  if (fields(27).isEmpty()) { 0 } else { fields(27).toInt },
  if (fields(28).isEmpty()) { 0 } else { fields(28).toInt },
  if (fields(29).isEmpty()) { 0 } else { fields(29).toInt },
  if (fields(30).isEmpty()) { 0 } else { fields(30).toInt },
  fields(31).toInt,
  if (fields(32).isEmpty()) { 0 } else { fields(32).toInt },
  if (fields(33).isEmpty()) { 0 } else { fields(33).toInt })
return DIM_DATE_PRL1

}

/ ** Наша основная функция, в которой происходит действие * / def main (args: Array [String]) {

// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)

// Use new SparkSession interface in Spark 2.0
val spark = SparkSession
  .builder
  .appName("SparkSQL")
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
  .getOrCreate()

// Convert our csv file to a DataSet, using our Person case
// class to infer the schema.
import spark.implicits._
//val dataPath_tim = "D:\Project_Scala_Code_Optimizor\Input.txt" //args(0)
// val lines = spark.sparkContext.textFile("D:/Project_Scala_Code_Optimizor/input.txt")
val datardd_tim = spark.sparkContext.textFile("D:/Project_Scala_Code_Optimizor/Input.txt")
val timedimds = datardd_tim.map(mapper).toDS().cache()
 timedimds.write.parquet("D:/Project_Scala_Code_Optimizor/dimension.parquet")
spark.stop()

}}

...