фрейм данных искры не может заменить значения NULL - PullRequest
0 голосов
/ 15 октября 2018

Код ниже работает нормально, но если какое-либо одно поле является NULL из 5 столбцов SAL1, SAL2, SAL3, SAL4, SAL5, соответствующее TOTAL_SALARY приходит как NULL.Похоже, что нужно создать какое-то нулевое условие или искровый udf, помогите пожалуйста.

input:

NO NAME ADDR SAL1 SAL2 SAL3 SAL4 SAL5
1  ABC  IND  100  200  300  null 400
2  XYZ  USA  200  333  209  232  444

Сумма второй записи подходит, но в первой записи из-за нуляв SAL4 вывод также становится нулевым.

from pyspark.shell import spark
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
sc = spark.sparkContext

df = spark.read.option("header","true").option("delimiter", ",").csv("C:\\TEST.txt")
df.createOrReplaceTempView("table1")
df1 = spark.sql( "select * from table1" )
df2 = df1.groupBy('NO', 'NAME', 'ADDR').agg(F.sum(df1.SAL1 + df1.SAL2 + df1.SAL3 + df1.SAL4 + df1.SAL5).alias("TOTAL_SALARY"))
df2.show()

Заранее спасибо

Ответы [ 2 ]

0 голосов
/ 17 октября 2018

В основном под строкой кода проверьте все 5 полей SAL и, если оно пустое, замените его на 0. Если нет, оставьте исходное значение.

df1 = df.withColumn("SAL1", when(df.SAL1.isNull(), lit(0)).otherwise(df.SAL1))\
            .withColumn("SAL2", when(df.SAL2.isNull(), lit(0)).otherwise(df.SAL2))\
            .withColumn("SAL3", when(df.SAL3.isNull(), lit(0)).otherwise(df.SAL3))\
            .withColumn("SAL4", when(df.SAL4.isNull(), lit(0)).otherwise(df.SAL4))\
            .withColumn("SAL5", when(df.SAL5.isNull(), lit(0)).otherwise(df.SAL5))\
0 голосов
/ 15 октября 2018

Просто вставьте na.fill(0) в ваш код.Это заменит значения NULL на 0, и вы сможете выполнить операцию.

Итак, ваша последняя строка должна выглядеть следующим образом:

df2 = df1.na.fill(0).groupBy('NO', 'NAME', 'ADDR').agg(F.sum(df1.SAL1 + df1.SAL2 + df1.SAL3 + df1.SAL4 + df1.SAL5).alias("TOTAL_SALARY"))

Также кажется, что функция sum должна иметь возможность правильно обрабатывать нулевые значения.Я только что проверил следующий код:

df_new = spark.createDataFrame([
(1, 4), (2, None),  (3,None), (4,None),
(5,5), (6,None), (7,None),(1, 4), (2, 8),  (3,9), (4,1),(1, 2), (2, 1), (3,3), (4,7),
], ("customer_id", "balance"))
df_new.groupBy("customer_id").agg(sum(col("balance"))).show()
df_new.na.fill(0).groupBy("customer_id").agg(sum(col("balance"))).show()

Вывод:

+-----------+------------+
|customer_id|sum(balance)|
+-----------+------------+
|          7|        null|
|          6|        null|
|          5|           5|
|          1|          10|
|          3|          12|
|          2|           9|
|          4|           8|
+-----------+------------+
+-----------+------------+
|customer_id|sum(balance)|
+-----------+------------+
|          7|           0|
|          6|           0|
|          5|           5|
|          1|          10|
|          3|          12|
|          2|           9|
|          4|           8|
+-----------+------------+

Версия 1 содержит только значения NULL, если все значения в сумме равны NULL.Версия 2 вместо этого возвращает 0, поскольку все значения NULL заменяются на 0 *

...