Создать новый столбец Dataframe на основе существующих Dataframe в Spark - PullRequest
0 голосов
/ 30 мая 2018

Есть два DF, мне нужно заполнить новый столбец в DF1 скажем Flag при следующих условиях.

DF1
    +------+-------------------+
    ||AMOUNT|Brand             | 
    +------+-------------------+
    | 47.88|          Parle    |
    | 40.92|          Parle    |
    | 83.82|          Parle    |
    |106.58|          Parle    |
    | 90.51|          Flipkart |
    | 11.48|          Flipkart |
    | 18.47|          Flipkart |
    | 40.92|          Flipkart |
    |  30.0|          Flipkart |
    +------+-------------------+


DF2

+--------------------+-------+----------+
|       Brand        |   P1  |   P2     |
+--------------------+-------+----------+
|               Parle| 37.00 |  100.15  |
|            Flipkart|  10.0 |  30.0    |
+--------------------+-------+----------+

, если сумма, скажем, Бренда Parle в DF1 меньше значения P1 (Amount < P1) в DF2 для бренда "Parle", тогда флаг будет low, если P1 >= amount <= P2, то флаг будет mid, а если Amount > P2, то high, аналогично для других продавцов.

DF1 имеет очень большие данные, а DF2 очень маленький.

expected output

+------+-------------------+----------------+
||AMOUNT|Brand             |        Flag    |
+------+-------------------+----------------+
| 47.88|          Parle    |   mid          |
| 40.92|          Parle    |   mid          |
| 83.82|          Parle    |   mid          |
|106.58|          Parle    |   high         |
| 90.51|          Flipkart |   high         |
| 11.48|          Flipkart |   mid          |
| 18.47|          Flipkart |   mid          |
| 40.92|          Flipkart |   high         |
|  30.0|          Flipkart |   mid          |
+------+-------------------+----------------

Я знаю, что могу сделать соединение и получить результаты, но как мне соответствовать логике в искре.

Ответы [ 2 ]

0 голосов
/ 30 мая 2018

Я думаю, что с udf также выполнимо.

val df3 = df1.join(df2, Seq("Brand"), "left")
import org.apache.spark.sql.functions._
val mapper = udf((amount: Double, p1: Double, p2: Double) => if (amount < p1) "low" else if (amount > p2) "high" else "mid")
df3.withColumn("Flag", mapper(df3("AMOUNT"), df3("P1"), df3("P2")))
    .select("AMOUNT", "Brand", "Flag")
    .show(false)
0 голосов
/ 30 мая 2018

Простые left объединенные и вложенные when встроенные функции должны получить желаемый результат в виде

import org.apache.spark.sql.functions._
df1.join(df2, Seq("Brand"), "left")
  .withColumn("Flag", when(col("AMOUNT") < col("P1"), "low").otherwise(
    when(col("AMOUNT") >= col("P1") && col("AMOUNT") <= col("P2"), "mid").otherwise(
      when(col("AMOUNT") > col("P2"), "high").otherwise("unknown"))))
    .select("AMOUNT", "Brand", "Flag")
  .show(false)

, что должно дать вам

+------+--------+----+
|AMOUNT|Brand   |Flag|
+------+--------+----+
|47.88 |Parle   |mid |
|40.92 |Parle   |mid |
|83.82 |Parle   |mid |
|106.58|Parle   |high|
|90.51 |Flipkart|high|
|11.48 |Flipkart|mid |
|18.47 |Flipkart|mid |
|40.92 |Flipkart|high|
|30.0  |Flipkart|mid |
+------+--------+----+

Надеюсь, ответ полезен

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...