Фильтрация кадра данных на основе условия когда - PullRequest
0 голосов
/ 28 мая 2020

Привет, я пытаюсь отфильтровать фрейм данных на основе условия, а затем применить схему, если она соответствует, иначе оставьте ее как есть.

val schema = ArrayType(StructType(StructField("packQty",FloatType,true):: StructField("gtin",StringType,true) :: Nil))


+--------------+---------+-----------+-----+--------------------------------------+
|orderupcnumber|enrichqty|allocoutqty|allocatedqty|gtins                                        
|
+--------------+---------+-----------+--------------------------------------------+
|5203754   |15.0     |1.0        |5.0         |[{"packQty":120.0,"gtin":"00052000042276"}]|
|5203754   |15.0     |1.0        |2.0         |[{"packQty":120.0,"gtin":"00052000042276"}|
|5243700   |25.0     |1.0        |2.0         |na                                                                      
|
+--------------+---------+-----------+------------+-------------------------------+

Я пытаюсь добавить столбец на основе схемы, если Столбец gtins не является «na», если я добавляю 0, но он выдает ошибку, говоря:

 df.withColumn("jsonData",when($"gtins"=!="na",from_json($"gtins",schema)).otherwise(0))


 Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'CASE 
 WHEN contains(`gtins`, 'na') THEN 0 ELSE jsontostructs(`gtins`) END' due to data type 
 mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;;


 df.select($"orderupcnumber",$"enrichqty",$"allocoutqty",$"allocatedqty",explode($"jsonData").as("jsonData"))


 +--------------+---------+-----------+-----+--------------+
 |orderupcnumber|enrichqty|allocoutqty|allocatedqty|gtins|JsonData
 +--------------+---------+-----------+--------------------+
 |5203754   |15.0|1.0|5.0|[{"packQty":120.0,"gtin":"00052000042276”}]|[120.0, 00052000042276]
 |5203754   |15.0|1.0 |2.0|[{"packQty":120.0,"gtin":"00052000042276”}|[120.0,00052000042276]
 |5243700   |25.0 |1.0|2.0  |na  |null
 +--------------+---------+-----------+------------+----+


 df.select($"orderupcnumber",$"enrichqty",$"allocoutqty",$"allocatedqty",$"jsonData.packQty".as("packQty"),$"jsonData.gtin".as("gtin")

этот выбор выбирает только данные, где jsonData не равно null

+---------+-----------+------------+-------+--------------+
 orderupcnumber |enrichqty|allocoutqty|allocatedqty|packQty|gtin  |
 +-----------+------------+----------------+------------+
 5203754|15.0     |1.0        |5.0         |120.0  |00052000042276|
 5203754|15.0     |1.0        |5.0         |144.0  |00052000042283|
 5243700|25.0     |1.0        |5.0         |  | |
 +-----------+------------+----------------+------------+----------

как я могу включить один и с нулевым значением.

Ответы [ 3 ]

1 голос
/ 29 мая 2020

Exception в потоке "main" org. apache .spark. sql .AnalysisException: невозможно разрешить 'CASE WHEN contains (gtins,' na ') THEN 0 ELSE jsontostructs (gtins) END 'из-за несоответствия типов данных: выражения THEN и ELSE должны быть одного типа или приводиться к общему типу;

Исправить исключение выше

Вам необходимо преобразовать na значение для json array типа для соответствия другим значениям.

Пожалуйста, проверьте код ниже.

scala> df.withColumn("gtins",when($"gtins" === "na",to_json(array($"gtins"))).otherwise($"gtins")).withColumn("jsonData",from_json($"gtins",schema)).show(false)
+-------------------------------------------+-------------------------+
|gtins                                      |jsonData                 |
+-------------------------------------------+-------------------------+
|[{"packQty":120.0,"gtin":"00052000042276"}]|[[120.0, 00052000042276]]|
|[{"packQty":120.0,"gtin":"00052000042276"}]|[[120.0, 00052000042276]]|
|["na"]                                     |null                     |
+-------------------------------------------+-------------------------+


scala> df.withColumn("gtins",when($"gtins" === "na",to_json(array($"gtins"))).otherwise($"gtins")).withColumn("jsonData",from_json($"gtins",schema)).select($"gtins",$"jsonData.packQty".as("packQty"),$"jsonData.gtin".as("gtin")).show(false)
+-------------------------------------------+-------+----------------+
|gtins                                      |packQty|gtin            |
+-------------------------------------------+-------+----------------+
|[{"packQty":120.0,"gtin":"00052000042276"}]|[120.0]|[00052000042276]|
|[{"packQty":120.0,"gtin":"00052000042276"}]|[120.0]|[00052000042276]|
|["na"]                                     |null   |null            |
+-------------------------------------------+-------+----------------+
0 голосов
/ 31 мая 2020

Проблема с вашим предложением when и else: он ожидает того же типа возврата, что и from_json, что возможно только при использовании той же функции from_json с тем же форматированием схемы

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{ArrayType, FloatType, StringType, StructField, StructType}

object ApplySchema {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    //Create Dataframe from the List
    val sampleDf = List((5203754,15.0,1.0,5.0,"""[{"packQty":120.0,"gtin":"00052000042276"}]"""),
      (5203754,15.0,1.0,2.0,"""[{"packQty":120.0,"gtin":"00052000042276"}]""")
      ,(5203754,25.0,1.0,2.0,"na")
    ).toDF("orderupcnumber","enrichqty","allocoutqty","allocatedqty","gtins") // Map the column to data

    //JSON schema
    val schema = ArrayType(StructType(StructField("packQty",FloatType,true)::
      StructField("gtin",StringType,true) :: Nil))

    //Add column JSON parsed column "jsonData"
    sampleDf.withColumn("jsonData",
      when($"gtins"=!="na",from_json($"gtins",schema)) // Check if the value is NA then parse the JSON
      .otherwise(from_json(lit("[]"),schema))) // Else parse an empty JSON array
      .show()
  }

}

0 голосов
/ 29 мая 2020

Если ваши входные данные выглядят примерно так, как показано ниже, где несколько gtin находятся в массиве строк, вы можете сначала взорвать их, а затем применить схему и withColumn соответственно:

+--------------+---------+-----------+------------+--------------------------------------------------------------------------------------+
|orderupcnumber|enrichqty|allocoutqty|allocatedqty|gtins                                                                                 |
+--------------+---------+-----------+------------+--------------------------------------------------------------------------------------+
|5243754       |15.0     |1.0        |5.0         |[{"packQty":120.0,"gtin":"00052000042276"}, {"packQty":250.0,"gtin":"00052000012345"}]|
|5243700       |25.0     |1.0        |2.0         |[na]                                                                                  |
+--------------+---------+-----------+------------+--------------------------------------------------------------------------------------+

Затем используйте ниже: :

val schema = StructType(StructField("packQty",FloatType,true):: StructField("gtin",StringType,true) :: Nil)

df.withColumn("gtins",explode($"gtins")).withColumn("jsonData",from_json($"gtins",schema)).withColumn("packQty",$"jsonData.packQty").withColumn("gtin",$"jsondata.gtin").show(false)

+--------------+---------+-----------+------------+-----------------------------------------+-----------------------+-------+--------------+
|orderupcnumber|enrichqty|allocoutqty|allocatedqty|gtins                                    |jsonData               |packQty|gtin          |
+--------------+---------+-----------+------------+-----------------------------------------+-----------------------+-------+--------------+
|5243754       |15.0     |1.0        |5.0         |{"packQty":120.0,"gtin":"00052000042276"}|[120.0, 00052000042276]|120.0  |00052000042276|
|5243754       |15.0     |1.0        |5.0         |{"packQty":250.0,"gtin":"00052000012345"}|[250.0, 00052000012345]|250.0  |00052000012345|
|5243700       |25.0     |1.0        |2.0         |na                                       |null                   |null   |null          |
+--------------+---------+-----------+------------+-----------------------------------------+-----------------------+-------+--------------+
...