Объединить столбец фрейма искровых данных со строками в Scala - PullRequest
0 голосов
/ 08 октября 2018

Я пытаюсь построить строку путем объединения значений из фрейма данных.Например:

val df = Seq(
  ("20181001","10"),     
  ("20181002","40"),
  ("20181003","50")).toDF("Date","Key")
df.show

Вывод DF, как показано ниже.

enter image description here

Здесь я хочу построить условие на основезначения фрейма данных, такие как: (Дата = 20181001 и ключ = 10) или (Дата = 20181002 и ключ = 40) или (Дата = 20181003 и ключ = 50) и т. д. Сгенерированное условие будет служитьввод для другого процесса.Здесь столбцы во фрейме данных могут быть динамическими.

Приведенный ниже фрагмент кода, который я пробую, формирует строку по мере необходимости, но является статической. Также не очень уверен, как она будет работать, когда мне нужно будет сгенерировать условие для более чем 10 столбцов.Любое предложение высоко ценится.

val df = Seq(
  ("20181001","10"),     
  ("20181002","40"),
  ("20181003","50")).toDF("Date","Key")

val colList = df.columns
var cond1 = ""
var finalCond =""
for (row <- df.rdd.collect)
 {
    cond1 = "("
    var pk = row.mkString(",").split(",")(0)
    cond1 = cond1+colList(0)+"="+pk
    var ak = row.mkString(",").split(",")(1)
    cond1 = cond1 +" and " + colList(1)+ "=" +ak +")"
    finalCond = finalCond + cond1 + " or " 
    cond1= ""    
 }
 print("Condition:" +finalCond.dropRight(3))

enter image description here

Ответы [ 3 ]

0 голосов
/ 08 октября 2018

Проверьте это решение DF.

scala> val df = Seq(
       |   ("20181001","10"),
       |   ("20181002","40"),
       |   ("20181003","50")).toDF("Date","Key")
df: org.apache.spark.sql.DataFrame = [Date: string, Key: string]

scala> val df2 = df.withColumn("gencond",concat(lit("(Date="), 'Date, lit(" and Key=") ,'Key,lit(")")))
df2: org.apache.spark.sql.DataFrame = [Date: string, Key: string ... 1 more field]


scala> df2.agg(collect_list('gencond)).show(false)
+------------------------------------------------------------------------------------+
|collect_list(gencond)                                                               |
+------------------------------------------------------------------------------------+
|[(Date=20181001 and Key=10), (Date=20181002 and Key=40), (Date=20181003 and Key=50)]|
+------------------------------------------------------------------------------------+

EDIT1

Вы можете прочитать их из файлов паркета и просто изменить названия, как в этом решении.На последнем этапе снова замените имена из заголовка паркета.Проверьте это.

scala> val df = Seq(("101","Jack"),("103","wright")).toDF("id","name")  // Original names from parquet
df: org.apache.spark.sql.DataFrame = [id: string, name: string]

scala> val df2= df.select("*").toDF("Date","Key")  // replace it with Date/Key as we used in this question
df2: org.apache.spark.sql.DataFrame = [Date: string, Key: string]

scala> val df3 = df2.withColumn("gencond",concat(lit("(Date="), 'Date, lit(" and Key=") ,'Key,lit(")")))
df3: org.apache.spark.sql.DataFrame = [Date: string, Key: string ... 1 more field]

scala> val df4=df3.agg(collect_list('gencond).as("list"))
df4: org.apache.spark.sql.DataFrame = [list: array<string>]

scala> df4.select(concat_ws(" or ",'list)).show(false)
+----------------------------------------------------+
|concat_ws( or , list)                               |
+----------------------------------------------------+
|(Date=101 and Key=Jack) or (Date=103 and Key=wright)|
+----------------------------------------------------+

scala> val a = df.columns(0)
a: String = id

scala> val b = df.columns(1)
b: String = name

scala>  df4.select(concat_ws(" or ",'list).as("new1")).select(regexp_replace('new1,"Date",a).as("colx")).select(regexp_replace('colx,"Key",b).as("colxy")).show(false)
+--------------------------------------------------+
|colxy                                             |
+--------------------------------------------------+
|(id=101 and name=Jack) or (id=103 and name=wright)|
+--------------------------------------------------+


scala>
0 голосов
/ 08 октября 2018

Используя udf вы можете сделать для переменной число columns, как показано ниже

val list=List("Date","Key")

def getCondString(row:Row):String={
    "("+list.map(cl=>cl+"="+row.getAs[String](cl)).mkString(" and ")+")"
  }

val getCondStringUDF=udf(getCondString _)
df.withColumn("row", getCondStringUDF(struct(df.columns.map(df.col(_)):_*))).select("row").rdd.map(_(0).toString()).collect().mkString(" or ")
0 голосов
/ 08 октября 2018

Вызов функции сбора данных возвращает результат в программу драйвера, поэтому, если у вас огромный DataFrame, вам может не хватить памяти.

Если вы уверены, что имеете дело только с небольшим числом строк, которое неЭто не проблема.

Вы можете сделать что-то вроде:

df.map(row => s"($Date={row.getString(0)} and Key=${row.getString(1)})").collect.mkString("Condition: ", " or ", "")

Вывод:

res2: String = Condition: (Date=20181001 and Key=10) or (Date=20181002 and Key=40) or (Date=20181003 and Key=50)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...