Динамическое создание имен столбцов, чтобы передать их для выбора при создании фрейма Spark Data - PullRequest
0 голосов
/ 04 февраля 2019

Я работаю в Spark и Scala последние 2 месяца, и я новичок в этой технологии.Я выделил выбранные столбцы (с помощью regexp_replace) как List [String] () и передал для создания фрейма Spark Data и его выдачи мне ошибку как «Не удается разрешить».Ниже приведены шаги, которые я выполнил и попробовал.

Определение значения:

Defining the column which I would like to identify in the src data frame
val col_name = "region_id"
Defining the column which will be used to replace the src data frame column from ref data frame
val surr_key_col_name = "surrogate_key"

Я создал два кадра данных, как показано ниже

src_df

region id | region_name | region_code  
10001189  | Spain       |    SP09 8545  
10001765  | Africa      |    AF97 6754  

ref_df

region id | surrogate_key  
1189      |    2345  
1765      |    8978  

val src_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3://bucket/src_details.csv")
val ref_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3://bucket/ref_details.csv")

Я перебираю, чтобы определить столбец, который мне нужен для использования reg match изамените другим значением столбца Фрейма данных и сохраните его в Списке, чтобы передать его в Фреймы данных. Выберите

val src_header_rec = src_df.columns.toList

//Loop through source file header to identify the region_id and replace it with surrogate_id by doing a pattern match( I don't want to replace the 
for (src_header_cols <- src_header_rec) {
if (col_name == src_header_cols) {
src_column_names :+="regexp_replace("+"$"+s""""src.$src_header_cols""""+","+"$"+s""""ref.$src_header_cols""""+","+"$"+s""""ref.$surr_key_col_name""""+")"+".as("+s""""$src_header_cols""""+")"
}
else {
src_column_names :+= "src."+src_header_cols
}
}

После построения столбца выбора в Списке [String] () с помощью цикла for выше, япередача его в столбцы выбора для создания final_df

val final_df = src_df.alias("src").join(ref_df.alias("ref"), src_df(col_name)=== ref_df(col_name),"left_outer").select(src_column_names.head,src_column_names.tail:_*)

Если я передам столбцы напрямую, не используя List [String] () в выделенном фрейме данных, моя замена regexp_replace работает

val final_df = src_df.alias("src").join(ref_df.alias("ref"), src_df(col_name)=== ref_df(col_name),"left_outer").select(regexp_replace($"src.region_id",$"ref.region_id",$"ref.surrogate_key").as("region_id"))

Я не уверен, почему он не работает, когда я передаю его как List [String] ()

Когда я удаляю подстановку regexp_replace в цикле for и передаю ее как List [String]() для выбора фрейма данных он работает правильно, как показано ниже:

Этот код очень хорошо работает с выбором фрейма данных:

for (src_header_cols <- src_header_rec) {
if (col_name == src_header_cols) {
src_column_names :+= "ref."+surr_key_col_name
}
else {
src_column_names :+= "src."+src_header_cols
}
}

val final_df = src_df.alias("src").join(ref_df.alias("ref"), src_df(col_name)===ref_df(col_name),"left_outer").select(src_column_names.head,src_column_names.tail:_*)

Фрейм данных результата / вывода, который я пытаюсь получить, равен

final_df

region id    | region_name | region_code  
1000**2345** | Spain       |  SP09 8545  
1000**8978** | Africa      |  AF97 6754  

Итак, когда я пытаюсь построить Spark Data Frame, выберите в цикле for с regexp_replace в качестве List и используйте его, что выдает ошибку «Не удается разрешить».

1 Ответ

0 голосов
/ 08 февраля 2019

Я попытался создать временное представление фрейма данных и использовал то же регулярное выражение в операторе выбора временного представления.Это сработало.Пожалуйста, найдите ниже код, который я пробовал, и он работал.

//This for loop will scan through my header list and whichever column matches it frames regexp for those columns.So, the region_id from the Data Frame header matches the variable value that I have defined.
for (src_header_cols <- src_header_rec) {
    if (col_name == src_header_cols) {

        src_column_names :+= "regexp_replace(src."+s"$src_header_cols"+",ref."+s"$ref_col_name"+",ref."+s"$surr_key_col_name"+")"+s" $src_header_cols"
    }
    else {
        src_column_names :+= "src."+src_header_cols
    }
} 

//Creating Temporary view to apply SQL queries on it
src_df.createOrReplaceTempView("src")
ref_df.createOrReplaceTempView("ref")

//Framing SQL statements to be passed while querying
val selectExpr_1 = "select "+src_column_names.mkString(",")
val selectExpr_2 = selectExpr_1+" from src left outer join ref on(src."+s"$col_name"+" = ref."+s"$ref_col_name"+")"

// Creating a final Data Frame using the SQL statement created
val src_policy_masked_df = spark.sql(s"$selectExpr_2")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...