Чтение файла в Spark с символом новой строки (\ n) в полях, экранированный с помощью backsla sh (\) и без кавычек - PullRequest
2 голосов
/ 02 апреля 2020

У меня есть входной файл, который имеет следующую структуру,

col1, col2, col3

line1filed1,line1filed2.1\

line1filed2.2, line1filed3

line2filed1,line2filed2.1\

line2filed2.2, line2filed3

line3filed1, line3filed2, line3filed3

line4filed1,line4filed2,

line5filed1,,line5filed3

Выходной кадр данных должен быть,

col1, col2, col3

[line1filed1,line1filed2.1 line1filed2.2, line1filed3]

[line2filed1,line2filed2.1 line2filed2.2, line2filed3]

[line3filed1, line3filed2, line3filed3]

[line4filed1,line4filed2, null]

[line5filed1, null, line5filed3]

Я пытаюсь сделать

spark
.read
.option("multiLine", "true")
.option("escape", "\\")
.csv("path to file")

Некоторые решения предлагают перейти на wholeTextFiles, но также упоминается, что wholeTextFiles не является оптимальным решением.

Каков будет правильный способ сделать это?

PS: у меня есть входной производственный файл 50 ГБ.

1 Ответ

1 голос
/ 03 апреля 2020

Я пробовал этот кусок кода, думаю, его можно улучшить, но, возможно, он даст вам некоторые подсказки для решения вашей проблемы.

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

/**
  * col1, col2, col3
  * [line1filed1,line1filed2.1 line1filed2.2, line1filed3]
  * [line2filed1,line2filed2.1 line2filed2.2, line2filed3]
  * [line3filed1, line3filed2, line3filed3]
  * [line4filed1,line4filed2, null]
  * [line5filed1, null, line5filed3]
  */

object Multiline2 {

  val spark = SparkSession
    .builder()
    .appName("Multiline2")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id", "Multiline2")  // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  val input = "/home/cloudera/files/tests/multiline2.csv"

  def main(args: Array[String]): Unit = {
    try {
      Logger.getRootLogger.setLevel(Level.ERROR)

      val data = sc.textFile(input)
      val header = data.first()
      val columns = header.split(",")

      import spark.implicits._

      var aux = ""
      val multiline = data
          .filter(line => !line.equals(header))
          .map(line => {
            if(line.contains("\")) {
              aux = line.substring(0,line.lastIndexOf("\"))
              ""
            } else {
              val l = s"$aux $line"
              aux = ""
              l
            }
          })
          .filter(line => !line.equals(""))
          .map(line => line.split(","))
          .map(r =>{ r.length match {
            case 2 => (r(0).trim,r(1).trim,"")
            case _ => (r(0).trim,r(1).trim,r(2).trim)
          }})
          .toDF(columns(0).trim, columns(1).trim, columns(2).trim)

      multiline.show()

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped.")
      spark.stop()
      println("SparkSession stopped.")
    }
  }
}
+-----------+--------------------+-----------+
|       col1|                col2|       col3|
+-----------+--------------------+-----------+
|line1filed1|line1filed2.1 lin...|line1filed3|
|line2filed1|line2filed2.1 lin...|line2filed3|
|line3filed1|         line3filed2|line3filed3|
|line4filed1|         line4filed2|           |
|line5filed1|                    |line5filed3|
+-----------+--------------------+-----------+

С уважением.

...