как добавить условие, чтобы, когда в кадре данных был какой-то столбец, добавлялось только соответствующее условие «когда» - PullRequest
0 голосов
/ 14 марта 2020

Я использую spark- sql -2.4.1v с java8. У меня есть сценарий, где мне нужно добавить условие когда для столбца, если этот столбец существует в соответствующем кадре данных. Как это можно сделать?

Пример:

val df = ...// may contain columns either abc, x or y or both....depend on some business logic.

val result_df =  df
                  .withColumn("new_column", when(col("abc") === "a" , concat(col("x"),lit("_"),col("y"))))

// здесь проблема в том, что иногда df может не содержать / извлекать столбец "x", тогда он должен давать "y" "значение в result_df. Но в приведенном выше операторе выдается ошибка, поскольку столбец «x» не присутствует в df в этой точке.

Так как проверить, представляет ли столбец (то есть «x») использование в concat () else go с оставшимися столбцами (т. Е. «Y»)

Здесь также возможно и наоборот, т. Е. В столбце df присутствует только col (x), но не столбец («y»). В некоторых случаях оба столбца x, y доступны в df, тогда он работает нормально.

Вопрос. как добавить условие в условии when и когда столбец представлен в кадре данных. ?

одна поправка, о которой идет речь. Если некоторых столбцов нет, я не должен go переходить в это условие withColumn.

Пример:

Если представлен столбец x:

val result_df =  df
                  .withColumn("new_x", when(col("abc") === "a" , concat(col("x"))))

Если представлен столбец x:

val result_df =  df
                  .withColumn("new_y", when(col("abc") === "a" , concat(col("y"))))

Если оба столбца x и y подарки:

val result_df =  df
               .withColumn("new_x", when(col("abc") === "a" , concat(col("x"))))
                .withColumn("new_y", when(col("abc") === "a" , concat(col("y"))))
                  .withColumn("new_x_y", when(col("abc") === "a" , concat(col("x"),lit("_"),col("y"))))

Ответы [ 2 ]

1 голос
/ 15 марта 2020

Этого можно добиться, динамически создав список столбцов, используя свойство columns и простой оператор if Scala / Java. Список должен включать или не включать targetColumn в зависимости от того, были ли найдены столбцы или нет в схеме кадра данных (scala код):

import org.apache.spark.sql.functions.{col, concat_ws}

// the column we should check for. Change this accordingly to your requirements i.e "y"
val targetColumn = "x"
var concatItems = Seq(col("y"))

// add targetColumn if found in df.columns
if (df.columns.contains(targetColumn))
  concatItems = concatItems :+ col(targetColumn)

df.withColumn("new_column", when(col("abc") === "a", concat_ws("_", concatItems:_*)))

Обратите внимание, что вместо contact мы используем concat_ws поскольку он будет проверяться автоматически, в то время как contactItems содержит один или несколько элементов и применяет разделитель _ соответственно.

Обновление:

Вот новый обновленный код, использующий select утверждение:

var selectExpr = null

if(df.columns.contains("x") && df.columns.contains("y"))
   selectExpr = Seq(
         when(col("abc") === "a", col("x")).as("new_x"), 
         when(col("abc") === "a", col("y")).as("new_y"),
         when(col("abc") === "a", concat_ws("_", col("x"), col("y"))).as("new_x_y")
   )
else if(df.columns.contains("y"))
   selectExpr =  when(col("abc") === "a", col("y")).as("new_y")
else
   selectExpr =  when(col("abc") === "a", col("x")).as("new_x")

df.select(selectExpr:_*)  

Обратите внимание, что нам не нужно использовать withColumn, select - это именно то, что вам нужно для вашего случая.

1 голос
/ 15 марта 2020

Вы должны сделать это с помощью управления потоком ваших языков, например, в python / PySpark с операторами if, else.

Причина в том, что df-функции Spark работают со столбцами, поэтому вы не можете применять условие проверки имен .when (), оно только просматривает значения в столбцах и применяет логику / условие по строкам.

Например, для F.when (col (x) == col (y)), spark переведет его в Java, где он будет применять эти логические значения c по часовой стрелке в двух столбцах.

Это также имеет смысл, если вы думаете, что Spark dfs состоят из объектов строк, поэтому он отправляет условие на накопитель, чтобы применить это условие к каждому объекту (строке), который выглядит следующим образом [ Row (x = 2), Row (y = 5)].

def check_columns(df, col_x, col_y, concat_name):
    '''
    df: spark dataframe
    col_x & col_y: the two cols to concat if both present
    concat_name: name for new concated col
    -----------------------------------------------------
    returns: df with new concated col if oth x & y cols present
    otherwise if returns df with x or y col if only on present 
    '''
    cols = list(col_x) + list(col_y)
    if all(item in df.columns for item in cols) 
    df = df.withColumn(concat_name, concat(col(col_x),lit("_"),col(col_y)))
    return df

Необходимо применять действие только в том случае, если оба x & y присутствуют так, как будто только один, он возвращает df с существующим x или y col в любом случае.

Я бы применил что-то подобное выше, сохранив как функцию для повторного использования.

То, что вы могли бы сделать с .when (), это только значения concat, где условие встретив построчно, это даст вам столбец со значениями, конкаталированными при выполнении условия.

df.when('concat_col', F.when( F.col('age') < F.lit('18'), 
                      concat(F.col('name'), F.lit('_underAge'))
                      .otherwise(F.col('name'),F.lit('_notUnderAge')))

Надеюсь, это поможет!

...