Как рассчитать процент в кадре данных Spark SQL? - PullRequest
0 голосов
/ 04 июля 2018

У меня есть набор данных карты aadhaar. Мне нужно найти три верхних штата, где процент карт Аадхаара, генерируемых для мужчин, самый высокий. Набор данных содержит данные:

Date,Registrar,Private_Agency,State,District,Sub_District,PinCode,Gender,Age,AadharGenerated,EnrolmentRejected,MobileNumProvided
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Ferrargunj,744105,F,91,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,F,4,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,F,5,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,F,8,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,F,11,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,F,12,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,F,17,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,F,28,2,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,F,30,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,F,31,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,F,34,2,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,F,39,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,F,44,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,M,29,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,M,38,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,M,45,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,M,64,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,M,66,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744101,M,75,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744103,F,9,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744103,F,44,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744103,F,54,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744103,F,59,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744103,M,27,1,0,0
20150420,Civil Supplies - A&N Islands,India Computer Technology,Andaman and Nicobar Islands,South Andaman,Port Blair,744103,M,29,1,0,0
20150420,Bank Of India,Frontech Systems Pvt Ltd,Andhra Pradesh,Krishna,Kanchikacherla,521185,M,40,1,0,0
20150420,CSC e-Governance Services India Limited,BASIX,Andhra Pradesh,Srikakulam,Veeraghattam,532460,F,24,1,0,0

Я пытался, но получаю ошибку:

sqlC.sql("SELECT STATE,
          (MALEADHAR/ADHAARDATA*100) AS PERCENTMALE 
         FROM 
                (SELECT STATE,SUM(ADHAARDATA) AS MALEADHAR 
                 FROM 
                       (SELECT State, SUM(AadharGenerated) AS ADHAARDATA
                         FROM data Group By State)
                         where Gender==='M') AS MALEADHAR 
                          GROUP BY STATE") 
                 SELECT STATE, SUM(AadharGenerated) AS MALEADAHAR FROM data where Gender='M' GROUP BY STATE")

Пожалуйста, исправьте запрос.

Спасибо, Анкит

Ответы [ 5 ]

0 голосов
/ 17 мая 2019

Вместо использования SQL-запросов вы можете просто использовать встроенные функции spark. Чтобы использовать функции, вам необходимо сначала создать фрейм данных из данных:

 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.{StructType, StructField, StringType,IntegerType}; 

#Schema
val schema = new StructType(
Array(
   StructField("date",IntegerType,true),
  StructField("registrar",StringType,true),
  StructField("private_agency",StringType,true),
  StructField("state",StringType,true),
  StructField("district",StringType,true),
  StructField("sub_district",StringType,true),
  StructField("pincode",IntegerType,true),
  StructField("gender",StringType,true),
  StructField("age",IntegerType,true),
  StructField("aadhar_generated",IntegerType,true),
  StructField("rejected",IntegerType,true),
  StructField("mobile_number",IntegerType,true),
  StructField("email_id",IntegerType,true)
  )
)


#Loading data

    val data = spark.read.option("header", "false").schema(schema).csv("aadhaar_data.csv")





#query 

data.groupBy("state", "gender").agg(sum("aadhar_generated")).filter(col("gender") === "M").orderBy(desc("sum(aadhar_generated)"))  

data.show
0 голосов
/ 06 июля 2018

Более простой подход - т. Е. Не просто использование вложенного SQL, что, конечно, также возможно, но более пошаговый подход, в котором используются как SQL, так и DF.

Обратите внимание, что отсутствие данной комбинации (в данном случае c1) означает 0%, что можно решить иначе.

Теперь вы можете адаптироваться таким же образом, я предоставил похожие имена переменных. Вы можете заказать, сбросить, переименовать столбцы.

import org.apache.spark.sql.functions._

val df = sc.parallelize(Seq(
  ("A", "X", 2, 100, "M", "Y"), ("F", "X", 7, 100, "M", "Y"), ("B", "X", 10, 100, "F", "Y"),
  ("C", "X", 1, 100, "F", "N"), ("D", "X", 50, 100, "M", "N"), ("E", "X", 30, 100, "M", "Y"),
  ("D", "X", 1, 100, "F", "N"), ("A", "X", 50, 100, "M", "N"), ("A", "X", 30, 100, "M", "Y"),
  ("D", "X", 1, 100, "M", "N"), ("X", "X", 50, 100, "M", "Y"), ("A", "X", 30, 100, "F", "Y"),
  ("K", "X", 1, 100, "M", "N"), ("K", "X", 50, 100, "M", "Y")
)).toDF("c1", "c2", "Val1", "Val2", "male_Female_Flag", "has_This")

df.createOrReplaceTempView("SOQTV")

spark.sql(
   "select * " +
   "from SOQTV " +
   "where 1 = 1 order by 1,5,6 ").show()

val dfA = spark.sql(" SELECT c1, count(*) " +
      " FROM SOQTV " + 
      " WHERE male_Female_Flag = 'M' " +
      " GROUP BY c1 ")

 val dfB = spark.sql(" SELECT c1, count(*) " +
      " FROM SOQTV " + 
      " WHERE male_Female_Flag = 'M' AND has_This = 'Y' " +
      " GROUP BY c1 ")

 val dfC = dfB.join(dfA, dfA("c1") === dfB("c1"), "inner")
 val colNames = Seq("c1", "Male_Has_Something", "c1_Again", "Male")
 val dfD = dfC.toDF(colNames: _*)

 dfC.show
 dfD.show
 dfD.withColumn("Percentage", (col("Male_Has_Something") / col("Male")) * 100 ).show

Это дает:

 +---+------------------+--------+----+-----------------+
 | c1|Male_Has_Something|c1_Again|Male|       Percentage|
 +---+------------------+--------+----+-----------------+
 |  K|                 1|       K|   2|             50.0|
 |  F|                 1|       F|   1|            100.0|
 |  E|                 1|       E|   1|            100.0|
 |  A|                 2|       A|   3|66.66666666666666|
 |  X|                 1|       X|   1|            100.0|
 +---+------------------+--------+----+-----------------+
0 голосов
/ 05 июля 2018

Продолжая, я помню лучший подход !!!

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

val df = sc.parallelize(Seq(
   ("A", "X", 2, 100), ("A", "X", 7, 100), ("B", "X", 10, 100),
   ("C", "X", 1, 100), ("D", "X", 50, 100), ("E", "X", 30, 100)
    )).toDF("c1", "c2", "Val1", "Val2")

val df2 = df
  .groupBy("c1")
  .agg(sum("Val1").alias("sum"))
  .withColumn("fraction", col("sum") /  sum("sum").over())

df2.show
0 голосов
/ 05 июля 2018

Кроме того, с SQL, просто добавьте дополнительную фильтрацию и т. Д. На этом доллар останавливается.

df.createOrReplaceTempView("SOQTV")

spark.sql(" SELECT c1, SUM(Val1) / (SELECT SUM(Val1) FROM SOQTV) as Perc_Total_for_SO_Question  " +
      " FROM SOQTV " + 
      " GROUP BY c1 ").show()

Получены те же ответы.

0 голосов
/ 05 июля 2018

Подать заявку после рассмотрения этого простого подхода. Также возможны и другие способы, но простой подход, и вы можете адаптироваться соответственно к одной или одной группе, с фильтрацией и т. Д. Требуются некоторые исследования.

import org.apache.spark.sql.functions._

val df = sc.parallelize(Seq(
   ("A", "X", 2, 100), ("A", "X", 7, 100), ("B", "X", 10, 100),
   ("C", "X", 1, 100), ("D", "X", 50, 100), ("E", "X", 30, 100)
    )).toDF("c1", "c2", "Val1", "Val2")

val total = df.select(col("Val1")).rdd.map(_(0).asInstanceOf[Int]).reduce(_+_)
// Or val total2: Long = df.agg(sum("Val1").cast("long")).first.getLong(0)

val df2 = df.groupBy($"c1").sum("Val1")
val df3 = df2.withColumn("perc_total", ($"sum(val1)" / total))

df3.show

Дает:

+---+---------+----------+
| c1|sum(Val1)|perc_total|
+---+---------+----------+
|  E|       30|       0.3|
|  B|       10|       0.1|
|  D|       50|       0.5|
|  C|        1|      0.01|
|  A|        9|      0.09|
+---+---------+----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...