Добавить новый столбец в Spark DF на основе логики - PullRequest
0 голосов
/ 04 июня 2018

Необходимо добавить новый столбец ниже DF на основе других столбцов.Вот схема DF

scala> a.printSchema()
root
 |-- ID: decimal(22,0) (nullable = true)
 |-- NAME: string (nullable = true)
 |-- AMOUNT: double (nullable = true)
 |-- CODE: integer (nullable = true)
 |-- NAME1: string (nullable = true)
 |-- code1: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- revised_code string (nullable = true)

Теперь я хочу добавить флаг скажем столбца в соответствии с условиями ниже

 1=> if code == revised_code, than flag is P
 2 => if code != revised code than I  
 3=> if both code and revised_code is null than no flag.

это udf, который я пытаюсь, но он дает I для обоих случаев 1 и 3.

 def tagsUdf =
    udf((code: String, revised_code: String) =>
      if (code == null  && revised_code == null ) ""
      else if (code == revised_code) "P" else "I")


tagsUdf(col("CODE"), col("revised_code"))

Кто-нибудь может указать, какую ошибку я совершаю

I/P DF
+-------------+-------+------------+
|NAME         |   CODE|revised_code|
+-------------+-------+------------+
|       amz   |   null|       null|
|   Watch     |   null|       5812|
|   Watch     |   null|       5812|
|   Watch     |   5812|       5812|
|       amz   |   null|       null|
|   amz       | 9999  |       4352|
+-------------+-------+-----------+
Schema:
root
 |-- MERCHANT_NAME: string (nullable = true)
 |-- CODE: integer (nullable = true)
 |-- revised_mcc: string (nullable = true)

O/P DF    
+-------------+-------+-----------------+
|NAME         |   CODE|revised_code|flag|
+-------------+-------+-----------------+
|   amz       |   null|       null| null|
|   Watch     |   null|       5812|  I  |
|   Watch     |   null|       5812|  I  |
|   Watch     |   5812|       5812|  P  |
|   amz       |   null|       null| null|
|amz          | 9999  |       4352|  I  |
+-------------+-------+-----------------+

1 Ответ

0 голосов
/ 04 июня 2018

Вам не нужна функция udf для этого.Простая вложенная when встроенная функция должна сработать.

import org.apache.spark.sql.functions._
df.withColumn("CODE", col("CODE").cast("string"))
  .withColumn("flag", when(((isnull(col("CODE")) || col("CODE") === "null") && (isnull(col("revised_code")) || col("revised_code") === "null")), "").otherwise(when(col("CODE") === col("revised_code"), "P").otherwise("I")))
  .show(false)

Здесь столбец CODE приводится к stringType перед применением логики, когда используется так, чтобы оба CODE и revised_code соответствовалив типе данных при сравнении.

Примечание: CODE столбец является IntegerType, и он не может быть нулевым в любом случае.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...