Spark Scala каждый набор данных выводится как один ряд данных - PullRequest
0 голосов
/ 24 июня 2018

У меня есть несколько файлов .nt (NTriples) в каталоге. Я хочу прочитать каждый набор данных и сохранить соответствующие выходные значения в одной строке кадра данных.

Допустим, у меня есть dataset1.nt, dataset2.nt, ..., datasetn.nt. При чтении каждого набора данных используется следующий код:

val input = "src/main/resources/dataset1.nt"
val triplesRDD = NTripleReader.load(spark, JavaURI.create(input))
//NTripleReader reads .nt file and separates each line of dataset into subject, predicate and object     
/* My code to output number of distinct subjects, predicates and blank subjects in a dataset */

Допустим, набор данных1 дает следующий вывод:

  • Количество различных предметов: хххх
  • Количество различных предикатов: гг
  • Количество пустых предметов: zzz

Допустим, набор данных2 дает следующий вывод:

  • Количество различных предметов: ааааа
  • Количество различных предикатов: b
  • Количество пустых предметов: куб. См

и так далее ...

Когда я использую следующий код для чтения всех файлов в моем каталоге:

val input = "src/main/resources/*"
val triplesRDD = NTripleReader.load(spark, JavaURI.create(input))

Это дает мне следующий вывод:

  • Количество различных субъектов: xxxx + aaaaa + ... // добавление всех индивидуальных значений каждого набора данных
  • Количество различных предикатов: yy + b + ...
  • Количество пустых предметов: zzz + cc + ...

Однако я хочу, чтобы мой вывод был таким:

Distinct Subjects | Distinct Predicates | Blank Subjects
xxxx              | yy                  | zzz
aaaaa             | b                   | cc    
...               | ...                 | ...

Пожалуйста, дайте мне знать, как мне достичь желаемого результата.

Заранее спасибо.

1 Ответ

0 голосов
/ 27 июня 2018

Я отвечаю на мой вопрос. Я надеюсь, что это может быть полезно для других

import java.io.File
//import other necessary packages


object abc {
  var df1: DataFrame = _
  var df2: DataFrame = _         
  var df3: DataFrame = _

  def main(args: Array[String]):Unit = 
  {
    //initializing the spark session locally
    val spark = SparkSession.builder
          .master("local[*]")
          .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .appName("abc")
          .getOrCreate()

//    creates a list of all files in a directory:
    def getListOfFiles(dir: String):List[File] = 
    {
      val path = new File("path/to/directory/")
      if (path.exists && path.isDirectory) 
      {
        path.listFiles.filter(_.isFile).toList
      } 
      else 
      {
        List[File]()
      }
    }

      val files = getListOfFiles("path/to/directory/")
      val input = ""
      for (input <- files)
      {  
      //  println(input)
        val triplesRDD = NTripleReader.load(spark, JavaURI.create(input.toString()))

        /*code to generate dataframe columns value*/

        import spark.implicits._

        if(input == files(0))
        {
            df3 = Seq(
            (column1_value, column2_value, column3_value, column4_value, column5_value, column6_value)
            ).toDF("column1_name", "column2_name", "column3_name", "column4_name", "column5_name", "column6_name")
        } 
        else
        {    
            df1 = Seq(
            (column1_value, column2_value, column3_value, column4_value, column5_value, column6_value)
            ).toDF("column1_name", "column2_name", "column3_name", "column4_name", "column5_name", "column6_name")  
            df2 = df3.union(df1)
            df3 = df2
        }
      }
      df3.show()
// import dataframe to .csv file
          df3.coalesce(1).write
          .option("header", "true")
          .csv("path/to/directory/sample.csv")
          spark.stop
      }
    }
...