Число искр и процент для каждого значения столбца Обработка исключений и загрузка в базу данных Hive - PullRequest
0 голосов
/ 24 января 2019

В приведенном ниже коде Scala Spark мне нужно найти количество и его процент от значений различных столбцов. Для этого мне нужно использовать метод withColumn для каждого столбца, например date, usage, payment, dateFinal, usageFinal, paymentFinal.

Для каждого расчета мне нужно использовать withColumn, чтобы получить сумму и агрегацию. Есть ли способ, которым мне не нужно писать,

.withColumn("SUM", sum("count").over() ).withColumn("fraction", col("count") / sum("count").over()).withColumn("Percent", col("fraction") * 100 ).drop("fraction")

каждый раз? Например, как вы можете видеть в приведенном ниже коде.

var dateFinalDF = dateFinal.toDF(DateColumn).groupBy(DateColumn).count.withColumn("SUM", sum("count").over()).withColumn("fraction", col("count") /  sum("count").over()).withColumn("Percent", col("fraction") * 100   ).drop("fraction")  

var usageFinalDF = usageFinal.toDF(UsageColumn).groupBy(UsageColumn).count.withColumn("SUM", sum("count").over()).withColumn("fraction", col("count") /  sum("count").over()).withColumn("Percent", col("fraction") * 100   ).drop("fraction")  

var paymentFinalDF = paymentFinal.toDF(PaymentColumn).groupBy(PaymentColumn).count.withColumn("SUM", sum("count").over()).withColumn("fraction", col("count") /  sum("count").over()).withColumn("Percent", col("fraction") * 100).drop("fraction")

теперь мой код приведен ниже, поэтому вы можете помочь нам добавить условия для различных столбцов, таких как дата, использование и т. Д. (Например, в коде мы получили столбец, содержащий дату, чем мы добавили счетчик, и другие условия, которые мы хотим) теперь те вещи, которые мы хотим видеть динамичными, все имена столбцов должны находиться внутри одного файла yml и должны считывать эти имена из этого файла. Как я могу добиться этого, может кто-нибудь помочь, и после прочтения файла YML, как я буду изменять свой код, помогите.

object latest

{

 def main(args: Array[String])

  {


  var fileList = new ListBuffer[String]()
  var dateList = new ListBuffer[String]()
  var fileL = new ListBuffer[String]()

  var fileL1 = new ListBuffer[String]()

  val sparkConf = new SparkConf().setMaster("local[4]").setAppName("hbase sql")
  val sc = new SparkContext(sparkConf)
  val spark1 = SparkSession.builder().config(sc.getConf).getOrCreate()
  val sqlContext = spark1.sqlContext


   import spark1.implicits._

    def f1(number: Double)=
     { 
     "%.2f".format(number).toDouble
     }
    val udfFunc = udf(f1 _)   

    def getCountPercent(df: DataFrame): DataFrame =

    {
  df.withColumn("SUM", sum("count").over() )
    .withColumn("fraction", col("count") / sum("count").over())
    .withColumn("Percent", col("fraction") * 100 )
    .withColumn("number", udfFunc(col("Percent")))
    .drop("Percent")
    .drop("fraction")

    }


   def occurenceCount(df: DataFrame,column: String)    
   {

    var usageFinalDF = df.groupBy(column).count.transform(getCountPercent)           

   for (u <- usageFinalDF.collect())
 {
         fileList += column + '~' + u.mkString("~")
  }
}




     val headerCSV=spark1.sqlContext.read.format("CSV").option("header","true").option("delimiter", """|""").load("C:\\Users\\ayushgup\\Downloads\\Header3.csv")

    val columns = headerCSV.columns


  val data =   spark1.sqlContext.read.format("CSV").option("delimiter", """|""").load("C:/Users/ayushgup/Downloads/home_data_usage_2018122723_1372673.csv").toDF(columns:_*)

 for (coll <- columns.toList) 
    {

  if (coll.toLowerCase().contains("date")) 
    {

    for (datesss <- data.select(coll).collect()) 
    {
      dateList += datesss.toString().slice(1, 8)

    }

    var dateFinalDF = dateList.toList.toDF(coll)

    occurenceCount(dateFinalDF,coll)

     } 
    else if (coll.toLowerCase().contains("usage")) 
 {

    var r = data.select(coll).withColumn(coll, when(col(coll) <= 1026, "<=1gb").when(col(coll) > 1026 && col(coll) < 5130, "1-5gb")
      .when(col(coll) > 5130 && col(coll) < 10260, "5-10gb")
      .when(col(coll) > 10260 && col(coll) < 20520, "10-20gb")
      .when(col(coll) > 20520, ">20gb")
      .otherwise(0)).toDF(coll)

      occurenceCount(r,coll)

  } 
  else if (coll.toLowerCase().contains("paymentamount")) 
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) <= 1500, "1-1500").when(col(coll) > 1500 && col(coll) < 1700, "1500-1700")
      .when(col(coll) > 1700 && col(coll) < 1900, "1700-1900")
      .when(col(coll) > 1900 && col(coll) < 2000, "1900-2000")
      .when(col(coll) > 2000, ">2000")
      .otherwise(0)).toDF(coll)

    occurenceCount(r,coll)

  } 
   else if (coll.toLowerCase().contains("accounttenure")) 
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) > 1000000 && col(coll) < 5000000, "1-5m").when(col(coll) > 5000000 && col(coll) < 11000000, "5-11m")
      .when(col(coll) > 12000000 && col(coll) < 23000000, "12-23m")
      .when(col(coll) > 24000000 && col(coll) < 35000000, "24-35m")
      .when(col(coll) > 36000000, ">36m")
      .otherwise(0)).toDF(coll)

   occurenceCount(r,coll)
  } 
  else if (coll.toLowerCase().equals("arpu")) 
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) <= 1500, "1-1500").when(col(coll) > 1500 && col(coll) < 1700, "1500-1700")
      .when(col(coll) > 1700 && col(coll) < 1900, "1700-1900")
      .when(col(coll) > 1900 && col(coll) < 2000, "1900-2000")
      .when(col(coll) > 2000, ">2000")
      .otherwise(0)).toDF(coll)

   occurenceCount(r,coll)

  } 
  else if (coll.equals("DisputeAmount") || coll.equals("ticketsAmount")) 
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) === 0, "0").when(col(coll) > 0, ">0")
      .otherwise(1)).toDF(coll)

   occurenceCount(r,coll)

  } 
  else if (coll.equals("serviceOrdersCreatedLast90Days"))
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) === 0, "0").when(col(coll) === 1, "1")
      .when(col(coll) === 2, "2")
      .when(col(coll) === 3, "3")
      .when(col(coll) > 3, ">3"))
      .toDF(coll)

   occurenceCount(r,coll)
  } 
  else 
  {

    import spark1.implicits._

   val actData1 = data.groupBy(coll).count().transform(getCountPercent) 



    occurenceCount(actData1,coll)

  }
   }

val f = fileList.toList
for (flist <- f) 
   {

   fileL += flist.replaceAll("[\\[\\]]", "")

   }

   var ff = fileL.toDF()


   var df1: DataFrame = ff.selectExpr("split(value, '~')[0] as 
     Attribute", "split(value, '~')[1] as Value","split(value, '~')[2] as 
     Count","split(value, '~')[3] as Sum","split(value, '~')[4] as 
    Percent");

   }

   }

Ответы [ 2 ]

0 голосов
/ 24 января 2019

Другой способ ... вроде коллекций scala - zip / map style: -)

scala> val df = Seq((10,20,30),(15,25,35)).toDF("date", "usage", "payment")
df: org.apache.spark.sql.DataFrame = [date: int, usage: int ... 1 more field]

scala> df.show(false)
+----+-----+-------+
|date|usage|payment|
+----+-----+-------+
|10  |20   |30     |
|15  |25   |35     |
+----+-----+-------+


scala> df.columns
res75: Array[String] = Array(date, usage, payment)

scala> var df2,df3,df4 = df
df2: org.apache.spark.sql.DataFrame = [date: int, usage: int ... 1 more field]
df3: org.apache.spark.sql.DataFrame = [date: int, usage: int ... 1 more field]
df4: org.apache.spark.sql.DataFrame = [date: int, usage: int ... 1 more field]

scala> val arr_all = Array(df2,df3,df4).zip(df.columns).map( d => d._1.groupBy(d._2).count.withColumn("sum",sum('count).over()).withColumn("fraction", col("count") / sum("count").over()).withColumn("Percent", col("fraction") * 100 ).drop("fraction") )
arr_all: Array[org.apache.spark.sql.DataFrame] = Array([date: int, count: bigint ... 2 more fields], [usage: int, count: bigint ... 2 more fields], [payment: int, count: bigint ... 2 more fields])

scala> val Array(dateFinalDF,usageFinalDF,paymentFinalDF) = arr_all
dateFinalDF: org.apache.spark.sql.DataFrame = [date: int, count: bigint ... 2 more fields]
usageFinalDF: org.apache.spark.sql.DataFrame = [usage: int, count: bigint ... 2 more fields]
paymentFinalDF: org.apache.spark.sql.DataFrame = [payment: int, count: bigint ... 2 more fields]

scala> dateFinalDF.show(false)
2019-01-25 04:10:10 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----+-----+---+-------+
|date|count|sum|Percent|
+----+-----+---+-------+
|15  |1    |2  |50.0   |
|10  |1    |2  |50.0   |
+----+-----+---+-------+


scala> usageFinalDF.show(false)
2019-01-25 04:10:20 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+-----+-----+---+-------+
|usage|count|sum|Percent|
+-----+-----+---+-------+
|20   |1    |2  |50.0   |
|25   |1    |2  |50.0   |
+-----+-----+---+-------+


scala> paymentFinalDF.show(false)
2019-01-25 04:10:50 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+-------+-----+---+-------+
|payment|count|sum|Percent|
+-------+-----+---+-------+
|35     |1    |2  |50.0   |
|30     |1    |2  |50.0   |
+-------+-----+---+-------+


scala>

Обратите внимание, что я разбил и включил var (df2,df3,df4) = df, так что будет легко выполнить шаги.

Все они могут быть объединены следующим образом.

scala> val Array(dateFinalDF,usageFinalDF,paymentFinalDF) = Array(df,df,df).zip(df.columns).map( d => d._1.groupBy(d._2).count.withColumn("sum",sum('count).over()).withColumn("fraction", col("count") / sum("count").over()).withColumn("Percent", col("fraction") * 100 ).drop("fraction") )
dateFinalDF: org.apache.spark.sql.DataFrame = [date: int, count: bigint ... 2 more fields]
usageFinalDF: org.apache.spark.sql.DataFrame = [usage: int, count: bigint ... 2 more fields]
paymentFinalDF: org.apache.spark.sql.DataFrame = [payment: int, count: bigint ... 2 more fields]

scala>
0 голосов
/ 24 января 2019

Вы можете инкапсулировать все операции .withColumn() в функции, которая возвращает DataFrame после применения всех операций.

def getCountPercent(df: DataFrame): DataFrame = {
  df.withColumn("SUM", sum("count").over() )
    .withColumn("fraction", col("count") / sum("count").over())
    .withColumn("Percent", col("fraction") * 100 )
    .drop("fraction")
}  

Использование:

используйте .transform() для применения функции:

var dateFinalDF = dateFinal.toDF(DateColumn).groupBy(DateColumn).count.transform(getCountPercent)
var usageFinalDF = usageFinal.toDF(UsageColumn).groupBy(UsageColumn).count.transform(getCountPercent)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...