Как соединить фреймы данных в моем примере? - PullRequest
0 голосов
/ 08 мая 2018

У меня есть два кадра данных:

edges =
   srcId    dstId    timestamp
   1        3        1345534569
   1        4        1346564657
   1        2        1345769687
   2        3        1345769687
   4        3        1345769687


vertices =
   id   name   s_type
   1    abc    A
   2    def    B
   3    rtf    C
   4    wrr    D

Я хочу получить фрейм данных следующей структуры (пример первой строки):

result = 

       srcId    name_src   s_type_src   dstId   name_dst   s_type_dst    timestamp
       1        abc        A            3       rtf        C             1345534569

Другими словами, я хочу добавить префикс _src к столбцам, к которым присоединяется srcId. И я хочу добавить префикс _dst к столбцам, к которым присоединяется dstId.

Вот как я решаю задачу, но я не знаю, как назначить префиксы _src и _dst именам столбцов:

val result = edges
                .join(vertices, col("srcId")===col("id"),"inner")
                .join(vertices, col("dstId")===col("id"),"inner")

1 Ответ

0 голосов
/ 08 мая 2018

Вы можете просто select столбцы с псевдонимом через as():

val edges = Seq(
  (1, 3, 1345534569),
  (1, 4, 1346564657),
  (1, 2, 1345769687),
  (2, 3, 1345769687),
  (4, 3, 1345769687)
).toDF("srcId", "dstId", "timestamp")

val vertices = Seq(
  (1, "abc", "A"),
  (2, "def", "B"),
  (3, "rtf", "C"),
  (4, "wrr", "D")
).toDF("id", "name", "s_type")

import org.apache.spark.sql.functions._

val result = edges.
  join(vertices.as("s"), $"srcId" === $"s.id", "inner").
  join(vertices.as("d"), $"dstId" === $"d.id", "inner").
  select(
    $"srcId", $"s.name".as("name_src"), $"s.s_type".as("s_type_src"),
    $"dstId", $"d.name".as("name_dst"), $"d.s_type".as("s_type_dst"),
    $"timestamp"
  )

result.show
// +-----+--------+----------+-----+--------+----------+----------+
// |srcId|name_src|s_type_src|dstId|name_dst|s_type_dst| timestamp|
// +-----+--------+----------+-----+--------+----------+----------+
// |    1|     abc|         A|    3|     rtf|         C|1345534569|
// |    1|     abc|         A|    4|     wrr|         D|1346564657|
// |    1|     abc|         A|    2|     def|         B|1345769687|
// |    2|     def|         B|    3|     rtf|         C|1345769687|
// |    4|     wrr|         D|    3|     rtf|         C|1345769687|
// +-----+--------+----------+-----+--------+----------+----------+

В качестве альтернативы, вы можете переименовать vertices столбцы соответственно, прежде чем соединять их следующим образом:

val cols = vertices.columns
val v_src = vertices.toDF(cols.map(_ + "_src"): _*)
val v_dst = vertices.toDF(cols.map(_ + "_dst"): _*)

val result = edges.
  join(v_src, $"srcId" === $"id_src", "inner").
  join(v_dst, $"dstId" === $"id_dst", "inner")
...