Как работать с многострочными без кавычек при чтении CSV-файла в Spark - PullRequest
0 голосов
/ 01 апреля 2020

Варианты этого были заданы ранее, но в моем случае нет строк в кавычках в моей многострочной.

У меня есть такой файл.

column1|column2
1|test1 test1 
test1
2|test2

Я хочу результат, подобный этому: (2 строки)

+-----------+------------------+
|column1    |column2           |
+-----------+------------------+
|          1| test1 test1
               test1
|          2|test2             |

Я пробовал это:

 Dataset<Row> dsTest = session.read()
                .option("header", "true")
                .option("delimiter", "|")
                .option("quote", "")
                .option("multiLine", "true")
                .csv("test.csv");

, и я получил это (3 строки)

 +-----------+------------------+
    |column1    |column2           |
    +-----------+------------------+
    |          1| test1 test1
           test1| null    
    |          2|test2             |

Может кто-то один направляет меня, чтобы решить эту проблему.

1 Ответ

1 голос
/ 01 апреля 2020

Я пробовал что-то вроде этого, может быть, это может дать вам некоторые подсказки. Код в Scala, но я думаю, что все ясно, и в Java он будет более или менее таким же.

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

import scala.util.{Failure, Success, Try}

object Multiline {

  val spark = SparkSession
    .builder()
    .appName("Multiline")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id", "Multiline")  // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  val input = "/home/cloudera/files/tests/multiline.csv"

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    import spark.implicits._

    try {

      def makeInt(s: String): Int = Try(s.toInt) match {
        case Success(n) => n
        case Failure(_) => -1
      }

      val data = sc.textFile(input)
      val head = data.first() // header

      val multiline = data
          .filter(line => line != head) // remove header
          .map(line => line.split('|'))
          .map(arr =>{
            val sInt: Int = makeInt(arr(0))
            if(sInt < 0) (sInt.toString, arr(0))
            else (arr(0),arr(1))
          })
          .toDF("column1", "column2")

      multiline.show()

      /*
      +-------+------------+
      |column1|     column2|
      +-------+------------+
      |      1|test1 test1 |
      |     -1|       test1|
      |      2|       test2|
      +-------+------------+
       */

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped.")
      spark.stop()
      println("SparkSession stopped.")
    }
  }
}
...