Scala UDF - объединяет значения столбцов в определенном форматировании - PullRequest
0 голосов
/ 05 сентября 2018

DF1 - это то, что у меня сейчас, и я хочу, чтобы DF1 выглядел как DF2.

Желаемый вывод:

 DF1                                                           DF2
+---------+----------------------------------------+          +---------+-------------------------------------------------------------------+
|   ID    |         Category                       |          |   ID    |                  category_name                                    |
+---------+----------------------------------------+          +---------+-------------------------------------------------------------------+  
|  31898  |   CP Bill Payment                      |          |  31898  |  CP Bill Payment + CP e-Transfer + CP IMT (CPS Limit + CPS Payee) |  
|  31898  |   CP e-Transfer + CP IMT               |          |  32614  |  CP Bill Payment + CP e-Transfer + CP Other Transfer (CPS Blocked)|
|  31898  |   CPS Limit + CPS Payee                |          |  35431  |  CP Bill Payment + CP e-Transfer                                  |
|  32614  |   CP e-Transfer + CP Other Transfer    |          |  33987  |  CP IMT (CPS Limit)                                               |
|  32614  |   CP Bill Payment                      |  =====>  |  35672  |  CPS Blocked                                                      |
|  32614  |   CPS Blocked                          |  =====>  |  37612  |  CPS Blocked + CPS Stop/Cancel/Reverse                            |
|  35431  |   CP e-Transfer                        |          +---------+-------------------------------------------------------------------+
|  35431  |   CP Bill Payment                      |
|  33987  |   CP IMT                               |
|  33987  |   CPS Limit                            |
|  35672  |   CPS Blocked                          |
|  37612  |   CPS Blocked + CPS Stop/Cancel/Reverse|
+---------+----------------------------------------+

У меня есть код ниже:

val DF2 = DF1.groupBy("ID").agg(collect_set("Category").as("CategorySet"))
.groupBy("ID")
.agg(collect_set("Category").as("CategorySet"))
.withColumn( "category_name",
  when(array_contains($"CategorySet", "CP Bill Payment") && array_contains($"CategorySet", "CP e-Transfer + CP IMT") && array_contains($"CategorySet", "CPS Limit + CPS Payee"), "CP Bill Payment + CP e-Transfer + CP IMT (CPS Limit + CPS Payee)").otherwise("---other conditions---"))
.select("ID","category_name")

Логика такова, что для одного и того же идентификатора, 31898/32614/33987: если содержит CP * и CPS *, это должен быть CP * (CPS *) или CP * + CP * (CPS *); 35431: если в массиве нет CPS *, просто используйте + для объединения всех элементов в массиве; 35672/37612: в противном случае, просто элемент в массиве. Кстати, категория должна быть отсортирована по возрастанию.

Код работает, просто слишком много возможных комбинаций. Как использовать UDF, чтобы сделать то же самое? Или есть какая-нибудь встроенная функция, способная сделать это? Заранее спасибо

Ответы [ 2 ]

0 голосов
/ 05 сентября 2018

Что я могу думать сейчас:

//UDF
def mapColumn(col: String) = udf { (xs: Seq[String]) => 
                        xs.map { x => 
                          if (x.contains(col+" ")) x else null
                        }.filter(_ != null).mkString(" + ")
                     }

import org.apache.spark.sql.functions._
val df1 = df.groupBy("Id").agg(
                               mapColumn("CP")(sort_array(collect_set("Category"))).as("CategorySetCP"),
                               mapColumn("CPS")(sort_array(collect_set("Category"))).as("CategorySetCPS")
                               ).withColumn("CategorySetCPS_New",concat(lit(" ("),'CategorySetCPS,lit(")")))
                               .withColumn("category_name",
                                           when(length($"CategorySetCP") > 0 and length($"CategorySetCPS") > 0,concat($"CategorySetCP",$"CategorySetCPS_New")).
                                           otherwise(when(length($"CategorySetCP") >0 and length($"CategorySetCPS") === 0,$"CategorySetCP").
                                           otherwise($"CategorySetCPS"))
                                           )
           .select('Id,'category_name)

df1.show(false)

Ouput:

+-----+-----------------------------------------------------------------+
|Id   |category_name                                                    |
+-----+-----------------------------------------------------------------+
|33987|CP IMT (CPS Limit)                                               |
|32614|CP Bill Payment + CP e-Transfer + CP Other Transfer (CPS Blocked)|
|35672|CPS Blocked                                                      |
|35431|CP Bill Payment + CP e-Transfer                                  |
|31898|CP Bill Payment + CP e-Transfer + CP IMT (CPS Limit + CPS Payee) |
|35612|CPS Blocked + CPS Stop/Cancel/Reverse                            |
+-----+-----------------------------------------------------------------+       

Надеюсь, это поможет!

0 голосов
/ 05 сентября 2018

Это пример того, как использовать UDAF. Очевидно, вам не нужен UDAF для объединения значений столбцов по id, но это позволяет добавить больше логики. Например, чтобы объединить значения по полю ID, вы можете создать UDAF, например:

class ConcatenateStrings extends UserDefinedAggregateFunction {
  override def inputSchema: StructType = StructType(StructField("input", StringType) :: Nil)

  override def bufferSchema: StructType = StructType(StructField("pair", StringType) :: Nil)

  override def dataType: DataType = StringType

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = ""

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      val b = buffer.getAs[String](0)
      val i = input.getAs[String](0)
      buffer(0) = { if(b.isEmpty) b + i else b + " + " + i }
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val b1 = buffer1.getAs[String](0)
    val b2 = buffer2.getAs[String](0)
    if(!b1.isEmpty)
      buffer1(0) = (b1) ++ " + " ++ (b2)
    else
      buffer1(0) = b2
  }

  override def evaluate(buffer: Row): Any = {
    val yourString = buffer.getAs[String](0)
    // Compute your logic and return another String
    yourString + "@procesed"
  }
}

Затем вы можете включить в свой вызов агрегации:

object testAppl0 {

  def main(args: Array[String]) : Unit = {

    val agg0 = new ConcatenateStrings()

    implicit val spark: SparkSession =
      SparkSession
        .builder()
        .appName("Test")
        .master("local[1]")
        .getOrCreate()

    import spark.implicits._

    val rows = Seq(Row(31898,"CP Bill Payment"), Row(31898,"CP e-Transfer + CP IMT"), Row(31898,"CPS Limit + CPS Payee "))

    val schema = List(
      StructField("ID", IntegerType, true),
      StructField("Category", StringType, true))

    val df =  spark.createDataFrame(
      spark.sparkContext.parallelize(rows),
      StructType(schema)
    )

    df.groupBy("ID").agg(agg0($"Category")).show(false)

  }
}

Будет возвращен новый столбец "concatenatestrings (Category)":

+-----+--------------------------------------------------------------------------+
|ID   |concatenatestrings(Category)                                              |
+-----+--------------------------------------------------------------------------+
|31898|CP Bill Payment + CP e-Transfer + CP IMT + CPS Limit + CPS Payee @procesed|
+-----+--------------------------------------------------------------------------+

Проверьте это, может быть, это может помочь

...