Вы можете просто select
столбцы с псевдонимом через as()
:
val edges = Seq(
(1, 3, 1345534569),
(1, 4, 1346564657),
(1, 2, 1345769687),
(2, 3, 1345769687),
(4, 3, 1345769687)
).toDF("srcId", "dstId", "timestamp")
val vertices = Seq(
(1, "abc", "A"),
(2, "def", "B"),
(3, "rtf", "C"),
(4, "wrr", "D")
).toDF("id", "name", "s_type")
import org.apache.spark.sql.functions._
val result = edges.
join(vertices.as("s"), $"srcId" === $"s.id", "inner").
join(vertices.as("d"), $"dstId" === $"d.id", "inner").
select(
$"srcId", $"s.name".as("name_src"), $"s.s_type".as("s_type_src"),
$"dstId", $"d.name".as("name_dst"), $"d.s_type".as("s_type_dst"),
$"timestamp"
)
result.show
// +-----+--------+----------+-----+--------+----------+----------+
// |srcId|name_src|s_type_src|dstId|name_dst|s_type_dst| timestamp|
// +-----+--------+----------+-----+--------+----------+----------+
// | 1| abc| A| 3| rtf| C|1345534569|
// | 1| abc| A| 4| wrr| D|1346564657|
// | 1| abc| A| 2| def| B|1345769687|
// | 2| def| B| 3| rtf| C|1345769687|
// | 4| wrr| D| 3| rtf| C|1345769687|
// +-----+--------+----------+-----+--------+----------+----------+
В качестве альтернативы, вы можете переименовать vertices
столбцы соответственно, прежде чем соединять их следующим образом:
val cols = vertices.columns
val v_src = vertices.toDF(cols.map(_ + "_src"): _*)
val v_dst = vertices.toDF(cols.map(_ + "_dst"): _*)
val result = edges.
join(v_src, $"srcId" === $"id_src", "inner").
join(v_dst, $"dstId" === $"id_dst", "inner")