Spark Соедините один и тот же набор данных несколько раз в разных столбцах - PullRequest
0 голосов
/ 18 ноября 2018

У меня ниже двух наборов данных.


code,name
IN,India
US,United States
UK,United Kingdom
SG,Singapore 

id,name,code1,code2,code3
1,abc,UK,SG,US
2,efg,SG,UK,US

Можем ли мы объединить code1, code2 и code3 с первым набором данных и получить имя для каждого столбца?


id,name,code1desc,code2desc,code3desc
1,abc,United Kingdom,Singapore,United States
2,efg,Singapore,United Kingdom,United States

Соединение с первым столбцом работает, однако сбой второго столбца.

Dataset<Row> code1 = people.join(countries, people.col("code1").equalTo(countries.col("code")),"left_outer").withColumnRenamed("name","code1desc");
    code1.show();

Код ниже не работает:

Dataset<Row> code2 = code1.join(countries, code1.col("code2").equalTo(countries.col("code")),"left_outer");
    code2.show();

Ответы [ 2 ]

0 голосов
/ 18 ноября 2018

Вы можете использовать udf, предполагая, что ваш код данных страны достаточно мал. Сначала мы соберем коды в карту, а затем применим udf к каждому столбцу кода.

code_df - ваш фрейм данных кода страны, а data_df - ваши данные.

import org.apache.spark.sql.functions._

val mapcode = code_df.rdd.keyBy(row => row(0)).collectAsMap()
println("Showing 10 rows of mapcode")

for ((k,v) <- mapcode) {
  printf("key: %s, value: %s\n", k, v)
}


def getCode( code: String ) : String = {
  val desc = mapcode(code).getAs[String](1)
  return desc
}

val getcode_udf = udf(getCode _)

val newdatadf = data_df.withColumn("code1desc", getcode_udf($"code1"))
  .withColumn("code2desc", getcode_udf($"code2"))
  .withColumn("code3desc", getcode_udf($"code3"))

println("Showing 10 rows of final result")
newdatadf.show(10, truncate = false)

Ниже приведен результат:

Showing 10 rows of mapcode
key: IN, value: [IN,India]
key: SG, value: [SG,Singapore]
key: UK, value: [UK,United Kingdom]
key: US, value: [US,United States]
Showing 10 rows of final result
+---+----+-----+-----+-----+--------------+--------------+-------------+
|id |name|code1|code2|code3|code1desc     |code2desc     |code3desc    |
+---+----+-----+-----+-----+--------------+--------------+-------------+
|1  |abc |UK   |SG   |US   |United Kingdom|Singapore     |United States|
|2  |efg |SG   |UK   |US   |Singapore     |United Kingdom|United States|
+---+----+-----+-----+-----+--------------+--------------+-------------+
0 голосов
/ 18 ноября 2018

Для каждого столбца "code [i]" каждого народа требуется объединение со странами, может быть выполнено в цикле, в Scala:

// data 
val countries = List(
  ("IN", "India"),
  ("US", "United States"),
  ("UK", "United Kingdom"),
  ("SG", "Singapore")
).toDF("code", "name")

val people = List(
  (1, "abc", "UK", "SG", "US"),
  (2, "efg", "SG", "UK", "US")
).toDF("id", "name", "code1", "code2", "code3")

// action
val countryColumns = List("code1", "code2", "code3")
val result = countryColumns.foldLeft(people)((people, column) =>
  people.alias("p")
    .join(countries.withColumnRenamed("name", column + "desc").alias("c"),
      col("p." + column) === $"c.code",
      "left_outer")
    .drop(column, "code")
)

Результат:

+---+----+--------------+--------------+-------------+
|id |name|code1desc     |code2desc     |code3desc    |
+---+----+--------------+--------------+-------------+
|1  |abc |United Kingdom|Singapore     |United States|
|2  |efg |Singapore     |United Kingdom|United States|
+---+----+--------------+--------------+-------------+

Примечание: если размер кадра данных "стран" небольшой, для повышения производительности можно использовать широковещательное соединение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...