Как агрегировать значения 2 столбцов в кадре данных искры - PullRequest
0 голосов
/ 29 октября 2018

У меня есть DataFrame с 4 столбцами.

+---------------+----------------------+---------------+-------------+          
|       district|sum(aadhaar_generated)|       district|sum(rejected)|
+---------------+----------------------+---------------+-------------+
|         Namsai|                     5|         Namsai|            0|
|      Champawat|                  1584|      Champawat|          131|
|         Nagaur|                 12601|         Nagaur|          697|
|         Umaria|                  2485|         Umaria|          106|
|    Rajnandgaon|                   785|    Rajnandgaon|           57|
| Chikkamagaluru|                   138| Chikkamagaluru|           26|
|Tiruchirappalli|                   542|Tiruchirappalli|          527|
|       Baleswar|                  2963|       Baleswar|         1703|
|       Pilibhit|                  1858|       Pilibhit|          305|
+---------------+----------------------+---------------+-------------+

Мне нужно добавить соответствующие значения позиции суммы (aadhaar_generated) и суммы (отклонено)

Например: для второго ряда мой o / p должен быть:

+---------------+------------+          
|       district|  total sum |                                                                   
+---------------+------------+
|      Champawat| 1715       |
+---------------+------------+

т.е. 1584+131= 1715

Как я могу добиться того же в Scala.

Ответы [ 2 ]

0 голосов
/ 29 октября 2018

Не могли бы вы попробовать ниже фрагмент

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField

val spark = SparkSession
  .builder()
  .config("spark.master", "local[1]")
  .appName("Test Job")
  .getOrCreate()

import spark.implicits._
val sparkContext = spark.sparkContext
sparkContext.setLogLevel("WARN")

//DEFINING INPUT
val inputDF = StructType(Array(StructField("district", StringType, false),
  StructField("sum(aadhaar_generated)", DoubleType, false),
  StructField("district_name", StringType, false),
  StructField("sum(rejected)", DoubleType, false)))

//READING INPUT FILE
val dF = spark.read.format("csv").option("sep", ",")
  .option("header", true)
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
  .schema(inputDF)
  .load("path\\to\\file");

println("Input DF")
dF.show()

var aggDF = dF.withColumn("Sum_Value", $"sum(aadhaar_generated)" + $"sum(rejected)")
println("After Aggregation")
aggDF.show()

OUTPUT

Input DF
+---------------+----------------------+---------------+-------------+
|       district|sum(aadhaar_generated)|  district_name|sum(rejected)|
+---------------+----------------------+---------------+-------------+
|         Namsai|                   5.0|         Namsai|          0.0|
|      Champawat|                1584.0|      Champawat|        131.0|
|         Nagaur|               12601.0|         Nagaur|        697.0|
|         Umaria|                2485.0|         Umaria|        106.0|
|    Rajnandgaon|                 785.0|    Rajnandgaon|         57.0|
| Chikkamagaluru|                 138.0| Chikkamagaluru|         26.0|
|Tiruchirappalli|                 542.0|Tiruchirappalli|        527.0|
|       Baleswar|                2963.0|       Baleswar|       1703.0|
|       Pilibhit|                1858.0|       Pilibhit|        305.0|
+---------------+----------------------+---------------+-------------+

After Aggregation
+---------------+----------------------+---------------+-------------+---------+
|       district|sum(aadhaar_generated)|  district_name|sum(rejected)|Sum_Value|
+---------------+----------------------+---------------+-------------+---------+
|         Namsai|                   5.0|         Namsai|          0.0|      5.0|
|      Champawat|                1584.0|      Champawat|        131.0|   1715.0|
|         Nagaur|               12601.0|         Nagaur|        697.0|  13298.0|
|         Umaria|                2485.0|         Umaria|        106.0|   2591.0|
|    Rajnandgaon|                 785.0|    Rajnandgaon|         57.0|    842.0|
| Chikkamagaluru|                 138.0| Chikkamagaluru|         26.0|    164.0|
|Tiruchirappalli|                 542.0|Tiruchirappalli|        527.0|   1069.0|
|       Baleswar|                2963.0|       Baleswar|       1703.0|   4666.0|
|       Pilibhit|                1858.0|       Pilibhit|        305.0|   2163.0|
+---------------+----------------------+---------------+-------------+---------+

Пожалуйста, дайте мне знать, если это работает.

0 голосов
/ 29 октября 2018

EDIT

В следующем ответе предполагается, что значение district в обоих столбцах каждой строки одинаково.


Вы можете сделать это, используя withColumn метод фреймов данных spark

# create some data
>>> data = [['a', 1, 2], ['a', 2, 2], ['b', 4, 3]]
>>> df =spark.createDataFrame(data, ['district','aadhar_generated', 'rejected'])
>>> df.show()
+--------+----------------+--------+
|district|aadhar_generated|rejected|
+--------+----------------+--------+
|       a|               1|       2|
|       a|               2|       2|
|       b|               4|       3|
+--------+----------------+--------+

# create the output column
>>> import pyspark.sql.functions as F
>>> df = df.withColumn("new total", F.col('aadhar_generated')+F.col('rejected'))
>>> df.show()
+--------+----------------+--------+---------+
|district|aadhar_generated|rejected|new total|
+--------+----------------+--------+---------+
|       a|               1|       2|        3|
|       a|               2|       2|        4|
|       b|               4|       3|        7|
+--------+----------------+--------+---------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...