Как сохранить результат в текстовый файл в spark scala - PullRequest
0 голосов
/ 07 декабря 2018
package com.rl.billingsol
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd
object billingSolution 
{
    def main (args:Array[String]) 
    {
        val conf = new SparkConf().setAppName("df operations").setMaster("local[2]")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

    import org.apache.spark.sql.types.{StringType, StructField, StructType}

    val schema_Attendance = new StructType()
        .add(StructField("Employee_ID", StringType, true))
        .add(StructField("Employee_Name", StringType, true)) 
        .add(StructField("Employee_Status(1-May-2018)", StringType, true))
        .add(StructField("Employee_Status(2-May-2018)", StringType, true))
        .add(StructField("Employee_Status(3-May-2018)", StringType, true))
        .add(StructField("Employee_Status(4-May-2018)", StringType, true))
        .add(StructField("Employee_Status(5-May-2018)", StringType, true))

        .add(StructField("Employee_Status(6-May-2018)", StringType, true))

        .add(StructField("Employee_Status(7-May-2018)", StringType, true))

        .add(StructField("Employee_Status(8-May-2018)", StringType, true))

        .add(StructField("Employee_Status(9-May-2018)", StringType, true))

        .add(StructField("Employee_Status(10-May-2018)", StringType, true)) 

        .add(StructField("Employee_Status(11-May-2018)", StringType, true))

        .add(StructField("Employee_Status(12-May-2018)", StringType, true))

        .add(StructField("Employee_Status(13-May-2018)", StringType, true))

        .add(StructField("Employee_Status(14-May-2018)", StringType, true))

        .add(StructField("Employee_Status(15-May-2018)", StringType, true))

        .add(StructField("Employee_Status(16-May-2018)", StringType, true))

        .add(StructField("Employee_Status(17-May-2018)", StringType, true)) 

        .add(StructField("Employee_Status(18-May-2018)", StringType, true))

        .add(StructField("Employee_Status(19-May-2018)", StringType, true))

        .add(StructField("Employee_Status(20-May-2018)", StringType, true))

        .add(StructField("Employee_Status(21-May-2018)", StringType, true))

        .add(StructField("Employee_Status(22-May-2018)", StringType, true))

        .add(StructField("Employee_Status(23-May-2018)", StringType, true))

        .add(StructField("Employee_Status(24-May-2018)", StringType, true)) 

        .add(StructField("Employee_Status(25-May-2018)", StringType, true))

        .add(StructField("Employee_Status(26-May-2018)", StringType, true))

        .add(StructField("Employee_Status(27-May-2018)", StringType, true))

        .add(StructField("Employee_Status(28-May-2018)", StringType, true))

        .add(StructField("Employee_Status(29-May-2018)", StringType, true))

        .add(StructField("Employee_Status(30-May-2018)", StringType, true))

        .add(StructField("Employee_Status(31-May-2018)", StringType, true))



val fileinput = sc.textFile("D:/inputfile.csv")                                       

        val filehead = fileinput.first()

        val attendance_without_header = fileinput.filter(line => !line.equals(filehead))

        val filehead_2 = attendance_without_header.first()

        val attendance_no_header = attendance_without_header.filter(line => !line.equals(filehead_2)) 

        val attendance_detail = attendance_no_header.map{x => x.split(",")}.map{x => Row(x(0),x(1),x(7),x(14),x(21),x(28),
                x(35),x(42),x(49),x(56),x(63),x(70),x(77),
                x(84),x(91),x(98),x(105),x(112),x(119),
                x(126),x(133),x(140),x(147),x(154),x(161),
                x(168),x(175),x(182),x(189),x(196),x(203),x(210),x(217))}

        val AttenDF = sqlContext.createDataFrame(attendance_detail, schema_Attendance)
        AttenDF.show()
    }
}

Ответы [ 2 ]

0 голосов
/ 07 декабря 2018

Я предлагаю вам прочитать и записать свой файл в формате CSV

из того места, где вы сейчас находитесь, выполните AttendDF.write.csv("path"), но если бы вы также spark.read.option("header","true").csv("inputfile.csv"), вам было бы проще обрабатывать оригиналфайл также

0 голосов
/ 07 декабря 2018

Мы можем сохранить результат в текстовом файле, используя следующий код в scala

df.write.text ("/ path / to / file")

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...