Если вам необходимо отобразить ваши столбцы в natural numbers
, начиная с 1, один из подходов заключается в применении zipWithIndex
к отдельным столбцам, добавлении 1 к индексу (так как zipWithIndex
всегда начинается с 0), преобразовании отдельных СДР.в DataFrames и, наконец, присоедините преобразованные DataFrames для индексных ключей:
val rdd = sc.parallelize(Seq(
("abc", "123a"),
("def", "783b"),
("abc", "674b"),
("xyz", "123a"),
("abc", "783b")
))
val df1 = rdd.map(_._1).distinct.zipWithIndex.
map(r => (r._1, r._2 + 1)).
toDF("col1", "c1key")
val df2 = rdd.map(_._2).distinct.zipWithIndex.
map(r => (r._1, r._2 + 1)).
toDF("col2", "c2key")
val dfJoined = rdd.toDF("col1", "col2").
join(df1, Seq("col1")).
join(df2, Seq("col2"))
// +----+----+-----+-----+
// |col2|col1|c1key|c2key|
// +----+----+-----+-----+
// |783b| abc| 2| 1|
// |783b| def| 3| 1|
// |123a| xyz| 1| 2|
// |123a| abc| 2| 2|
// |674b| abc| 2| 3|
//+----+----+-----+-----+
dfJoined.
select($"c1key".as("col1"), $"c2key".as("col2")).
show
// +----+----+
// |col1|col2|
// +----+----+
// | 2| 1|
// | 3| 1|
// | 1| 2|
// | 2| 2|
// | 2| 3|
// +----+----+
Обратите внимание, что если у вас все в порядке с ключами, начинающимися с 0, шаг map(r => (r._1, r._2 + 1))
можно пропустить при генерации df1
и df2
.