pyspark udf, который считается, когда условие удовлетворено - PullRequest
0 голосов
/ 15 января 2020

Проблема: у меня есть фрейм данных pyspark, который я хочу суммировать по столбцам и подсчитать для каждого идентификатора, отвечающего определенным условиям. Мой набор данных выглядит следующим образом:

my_dict = {'ID': {0: u'00319383',
  1: u'00337642',
  2: u'0346945',
  3: u'00400193',
  4: u'00405079',
  5: u'0426407',
  6: u'00445573',
  7: u'00485834',
  8: u'0493307',
  9: u'00501281'},
 'type_A': {0: u'A',
  1: u'A',
  2: u'A',
  3: u'A',
  4: u'A',
  5: u'A',
  6: u'A',
  7: u'A',
  8: u'A',
  9: u'A'},
 'type_B': {0: u'None',
  1: u'B',
  2: u'None',
  3: u'None',
  4: u'None',
  5: u'None',
  6: u'None',
  7: u'None',
  8: u'B',
  9: u'None'},
 'type_C': {0: u'C',
  1: u'C',
  2: u'C',
  3: u'C',
  4: u'C',
  5: u'C',
  6: u'C',
  7: u'C',
  8: u'C',
  9: u'C'},
 'type_D': {0: u'None',
  1: u'None',
  2: u'None',
  3: u'None',
  4: u'None',
  5: u'None',
  6: u'None',
  7: u'D',
  8: u'None',
  9: u'None'}}

Цель состоит в том, чтобы подсчитать вхождение продукта по идентификатору. Я разработал решение в SQL, которое делает то, что я хочу:

spark.sql('''
            select total, count(contract_id) as freq
            from 
            (
                select id, (typeA + typeB + typeC + typeD) as total
                from
                    (
                        select id
                        , case when type_A = 'A' then 1 else 0 end as typeA
                        , case when type_B = 'B' then 1 else 0 end as typeB 
                        , case when type_C = 'C' then 1 else 0 end as typeC  
                        , case when type_D = 'D' then 1 else 0 end as typeD  
                        from df 
                    ) a
            ) b

            group by total

         ''').toPandas()

Как я мог сделать это с помощью функции python / pyspark? Ищете идеи для решения такой проблемы?

1 Ответ

0 голосов
/ 15 января 2020

Хорошо, тогда это должно сработать:

from pyspark.sql.functions import *
df.select(
    (when(col("type_A") == lit("A"), lit(1)).otherwise(lit(0))+
    when(col("type_B") == lit("B"), lit(1)).otherwise(lit(0))+
    when(col("type_C") == lit("C"), lit(1)).otherwise(lit(0))+
    when(col("type_D") == lit("D"), lit(1)).otherwise(lit(0))).alias("total"), col("id")
).groupBy("total").agg(count(col("id")).alias("freq"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...