Антигруппа по / р применяется в Писпарке - PullRequest
1 голос
/ 17 апреля 2020

Я программист R, двигаюсь в мир pyspark и получил много базовых c трюков, но я все еще борюсь из-за того, что я бы применил, или использую c для циклов.

В этом случае я пытаюсь вычислить «антигрупповую» для идентификатора. По сути, идея состоит в том, чтобы посмотреть на популяцию для этого идентификатора, а затем на популяцию не для этого идентификатора, и иметь оба этих значения в одной строке. Получить заполнение для этого идентификатора легко, используя groupby, а затем присоединить его к набору данных с new_id в качестве единственного столбца.

Вот как бы я это сделал в R:

anti_group <- function(id){
    tr <- sum(subset(df1, new_id!=id)$total_1)
    to <- sum(subset(df1, new_id!=id)$total_2)
    54 * tr / to
  }
  test$other.RP54 <- sapply(test$new_id, anti_group  )

Как бы я это сделал в pyspark?

Спасибо!

Редактировать:

#df.show()
#sample data
+---+-----+
| id|value|
+---+-----+
|  1|   40|
|  1|   30|
|  2|   10|
|  2|   90|
|  3|   20|
|  3|   10|
|  4|    2|
|  4|    5|
+---+-----+

Затем некоторая функция, которая создает окончательный кадр данных, который выглядит следующим образом:

+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
|  1|           70|               137|
|  2|          100|               107|
|  3|           30|               177|
|  4|            7|               200|
+---+-------------+------------------+

Ответы [ 2 ]

1 голос
/ 17 апреля 2020

Я думаю, что вы можете сделать это в два этапа: сначала вы суммируете по идентификатору, затем берете сумму и вычитаете значение этого идентификатора.

Моя идея немного похожа на group_by(id) %>% summarise(x = sum(x)) %>% mutate(y = sum(x) - x) в dplyr

Предлагаемое мной решение основано на функции Window. Это не проверено:

Давайте сначала создадим данные

import pyspark.sql.functions as psf
import pyspark.sql.window as psw

df = spark.createDataFrame([(1,40),(1,30),(2,10),(2,90),(3,20),(3,10),(4,2),(4,5)], ['id','value'])

df.show(2)

+---+-----+
| id|value|
+---+-----+
|  1|   40|
|  1|   30|
+---+-----+
only showing top 2 rows

, а затем применим этот подход:

w = psw.Window.orderBy()
df_id = df.groupBy("id").agg(psf.sum("value").alias("grouped_total"))
df_id = (df_id
          .withColumn("anti_grouped_total",psf.sum("grouped_total").over(w))
          .withColumn('anti_grouped_total', psf.col('anti_grouped_total') - psf.col('grouped_total'))
        )

df_id.show(2)
+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
|  3|           30|               177|
|  1|           70|               137|
+---+-------------+------------------+
only showing top 2 rows

1 голос
/ 17 апреля 2020

Так что нет встроенной функции, которая бы копировала эту функцию groupBy, но вы могли бы легко сделать это, создав новый столбец, используя case(when/otherwise clause) для создания вашей группы и антигруппы , а затем groupBy на этом new column.

#df.show()
#sample data
+---+-----+
| id|value|
+---+-----+
|  1|   40|
|  1|   30|
|  2|   10|
|  2|   90|
|  3|   20|
|  3|   10|
|  4|    2|
|  4|    5|
+---+-----+

from pyspark.sql import functions as F
df.withColumn("anti_id_1", F.when(F.col("id")==1, F.lit('1')).otherwise(F.lit('Not_1')))\
  .groupBy("anti_id_1").agg(F.sum("value").alias("sum")).show()

+---------+---+
|anti_id_1|sum|
+---------+---+
|        1| 70|
|    Not_1|137|
+---------+---+

UPDATE:

from pyspark.sql.window import Window
from pyspark.sql import functions as F

w1=Window().partitionBy("id")
w=Window().partitionBy()
df.withColumn("grouped_total",F.sum("value").over(w1))\
  .withColumn("anti_grouped_total", (F.sum("value").over(w))-F.col("grouped_total"))\
  .groupBy("id").agg(F.first("grouped_total").alias("grouped_total"),\
                     F.first("anti_grouped_total").alias("anti_grouped_total"))\
  .drop("value").orderBy("id").show()


+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
|  1|           70|               137|
|  2|          100|               107|
|  3|           30|               177|
|  4|            7|               200|
+---+-------------+------------------+

Less verbose/concise способ достижения того же результата:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy()
df.groupBy("id").agg(F.sum("value").alias("grouped_total"))\
          .withColumn("anti_grouped_total",F.sum("grouped_total").over(w)-F.col("grouped_total")).orderBy("id"),show()

For 2 value columns:

df.show()
+---+------+------+
| id|value1|value2|
+---+------+------+
|  1|    40|    50|
|  1|    30|    70|
|  2|    10|    91|
|  2|    90|    21|
|  3|    20|    42|
|  3|    10|     4|
|  4|     2|    23|
|  4|     5|    12|
+---+------+------+

from pyspark.sql.window import Window
from pyspark.sql import functions as F

w = Window().partitionBy()
df.groupBy("id").agg(F.sum("value1").alias("grouped_total_1"),F.sum("value2").alias("grouped_total_2"))\
          .withColumn("anti_grouped_total_1",F.sum("grouped_total_1").over(w)-F.col("grouped_total_1"))\
          .withColumn("anti_grouped_total_2",F.sum("grouped_total_2").over(w)-F.col("grouped_total_2")).orderBy("id").show()

+---+---------------+---------------+--------------------+--------------------+
| id|grouped_total_1|grouped_total_2|anti_grouped_total_1|anti_grouped_total_2|
+---+---------------+---------------+--------------------+--------------------+
|  1|             70|            120|                 137|                 193|
|  2|            100|            112|                 107|                 201|
|  3|             30|             46|                 177|                 267|
|  4|              7|             35|                 200|                 278|
+---+---------------+---------------+--------------------+--------------------+
...