Как проверить логическое условие с другого Dataframe - PullRequest
0 голосов
/ 02 июня 2019

У меня есть три DF первое - база df второе - поведение df и третье - правило df

Base df:
+---+----+------+
| ID|Name|Salary|
+---+----+------+
|  1|   A|   100|
|  2|   B|   200|
|  3|   C|   300|
|  4|   D|  1000|
|  5|   E|   500|
+---+----+------+

Behavior DF:
+----+---------+------+
|S.NO|Operation|Points|
+----+---------+------+
|   1|  a AND b|   100|
|   2|   a OR b|   200|
|   3|otherwise|     0|
+----+---------+------+

Rule DF:
+----+-----+------+------------+-----+
|RULE|Table|   col|   operation|value|
+----+-----+------+------------+-----+
|   a| Base|Salary|       equal| 1000|
|   b| Base|Salary|Greater Than|  500|
+----+-----+------+------------+-----+

Я хочу рассчитать вознаграждение каждого человека и добавитьстолбец в базе df по имени вознаграждения и проверьте условие в поведении df. Если a и b истинно, он присваивает 100 баллов или, если OR b истинно, поэтому 200 баллов будут назначаться в противном случае, 0 баллов будут назначать, где условие a или bв таблице правил

Expected DF
 +---+----+------+------+
| ID|Name|Salary|Reward|
+---+----+------+------+
|  1|   A|   100|     0|
|  2|   B|   200|     0|
|  3|   C|   300|     0|
|  4|   D|  1000|   200|
|  5|   E|   500|     0|
+---+----+------+------+

1 Ответ

0 голосов
/ 03 июня 2019

Вы можете следовать этому подходу -

Я должен внести небольшие изменения в Rule и Behavior фрейм данных.Сохраненные операции в виде логики ("==") вместо строки ("равный").

Base = spark.createDataFrame([(1,'A',100),(2,'B',200),(3,'C',300),(4,'D',1000),(5,'E',500)],['ID','Name','Salary'])

Behavior = spark.createDataFrame([(1,'df.rule_a & df.rule_b',100),(2,'df.rule_a | df.rule_b',200),(3,'otherwise',0)],['SNo','Operation','Points'])

Rule = spark.createDataFrame([(1,'Base','Salary','==',1000),(2,'Base','Salary','>',500)],['RULE','Table','col','operation','value'])

Base.show()

#+---+----+------+
#| ID|Name|Salary|
#+---+----+------+
#|  1|   A|   100|
#|  2|   B|   200|
#|  3|   C|   300|
#|  4|   D|  1000|
#|  5|   E|   500|
#+---+----+------+

Behavior.show()

#+---+---------------------+------+
#|SNo|           Operation |Points|
#+---+---------------------+------+
#|  1|df.rule_a & df.rule_b|   100|
#|  2|df.rule_a | df.rule_b|   200|
#|  3|           otherwise |     0|
#+---+---------------------+------+

Rule.show()

#+----+-----+------+---------+-----+
#|RULE|Table|   col|operation|value|
#+----+-----+------+---------+-----+
#|   1| Base|Salary|       ==| 1000|
#|   2| Base|Salary|        >|  500|
#+----+-----+------+---------+-----+

Подготовьте логику для правил, хранящихся в Rules dataframe

Для динамически готовящихся правил выможет запустить for loop над Rule фреймом данных и передать номер итерации для фильтрации преобразования и преобразования правила.

from pyspark.sql.functions import  col,concat,lit

rule_a = Rule.filter("RULE == 1").select(concat(col("Table"), lit("['"),  col("col"), lit("']"), lit(" "),  col("Operation"), col("Value"))).collect()[0][0]

rule_b = Rule.filter("RULE == 2").select(concat(col("Table"), lit("['"),  col("col"), lit("']"), lit(" "),  col("Operation"), col("Value"))).collect()[0][0]

Добавить логический результат выполнения правил в хранилище данных

df = Base.withColumn("rule_a", eval(rule_a)).withColumn("rule_b", eval(rule_b))

df.show()

#+---+----+------+------+------+
#| ID|Name|Salary|rule_a|rule_b|
#+---+----+------+------+------+
#|  1|   A|   100| false| false|
#|  2|   B|   200| false| false|
#|  3|   C|   300| false| false|
#|  4|   D|  1000|  true|  true|
#|  5|   E|   500| false| false|
#+---+----+------+------+------+

поведение и соответствующие точки от Behavior dataframe к переменным

Для динамически готовящихся переменных вы можете запустить for loop над поведением dataframe и передать номер итерации в качестве переменной для фильтрации преобразования и имени столбца.

behavior1 = Behavior.filter("SNo==1").select( col("Operation")).collect()[0][0]
behavior1_points = Behavior.filter("SNo==1").select( col("Points")).collect()[0][0]

behavior2 = Behavior.filter("SNo==2").select( col("Operation")).collect()[0][0]
behavior2_points = Behavior.filter("SNo==2").select( col("Points")).collect()[0][0]

behavior3 = Behavior.filter("SNo==3").select( col("Operation")).collect()[0][0]
behavior3_points = Behavior.filter("SNo==3").select( col("Points")).collect()[0][0]

Окончательное решение

from pyspark.sql.functions import lit,when,col,greatest 

df\
  .withColumn("b1", eval(behavior1))\
  .withColumn("b2", eval(behavior2))\
           .select('*'
                   ,greatest(when(col('b1') == 'true',lit(behavior1_points)).otherwise(0)
                             ,when(col('b2') == 'true',lit(behavior2_points)).otherwise(0)
                             ,lit(behavior3_points)).alias('point')).drop('rule_a','rule_b','b1','b2').show()

#+---+----+------+-----+
#| ID|Name|Salary|point|
#+---+----+------+-----+
#|  1|   A|   100|    0|
#|  2|   B|   200|    0|
#|  3|   C|   300|    0|
#|  4|   D|  1000|  200|
#|  5|   E|   500|    0|
#+---+----+------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...