PYSPARK: объединить столбец таблицы с одним из двух столбцов из другой таблицы - PullRequest
0 голосов
/ 20 ноября 2018

Моя проблема заключается в следующем:

Table 1
ID1 ID2
 1  2 
 3  4

Table 2
C1    VALUE
 1    London
 4    Texas

Table3 
 C3    VALUE
  2     Paris
  3     Arizona

В таблице 1 указаны первичные и вторичные идентификаторы.Мне нужно создать окончательный вывод, который представляет собой агрегацию значений из таблицы 2 и таблицы 3 на основе сопоставления идентификаторов из таблицы 1.

, т. Е. Если значение в таблице 2 или таблице 3 сопоставлено с любым из идентификаторов, оно должно быть объединено в единое целое..

i.e my final output should look like:

ID  Aggregated
1  [2, London, Paris] // since Paris is mapped to 2 which is turn is mapped to 1
3  [4, Texas, Arizona] // Texas is mapped to 4 which in turn is mapped to 3

Любые предложения, как этого добиться в pyspark.

Я не уверен, что присоединение к таблицам поможет в этой проблеме.

Я думалPairedRDD может помочь мне в этом, но я не могу найти правильное решение.

Спасибо

1 Ответ

0 голосов
/ 20 ноября 2018

Ниже приведен очень простой подход:

spark.sql(
"""
  select 1 as id1,2 as id2 
  union
  select 3 as id1,4 as id2 
""").createOrReplaceTempView("table1")

spark.sql(
"""
  select 1 as c1, 'london' as city 
  union
  select 4 as c1, 'texas' as city 
""").createOrReplaceTempView("table2")

spark.sql(
"""
  select 2 as c1, 'paris' as city 
  union
  select 3 as c1, 'arizona' as city 
""").createOrReplaceTempView("table3")

spark.table("table1").show()
spark.table("table2").show()
spark.table("table3").show()

# for simplicity, union table2 and table 3

spark.sql(""" select * from table2 union all select * from table3 """).createOrReplaceTempView("city_mappings")
spark.table("city_mappings").show()

# now join to the ids:

spark.sql("""
  select id1, id2, city from table1
  join city_mappings on c1 = id1 or c1 = id2
""").createOrReplaceTempView("id_to_city")

# and finally you can aggregate: 

spark.sql("""
select id1, id2, collect_list(city)
from id_to_city
group by id1, id2
""").createOrReplaceTempView("result")

table("result").show()

# result looks like this, you can reshape to better suit your needs :
+---+---+------------------+
|id1|id2|collect_list(city)|
+---+---+------------------+
|  1|  2|   [london, paris]|
|  3|  4|  [texas, arizona]|
+---+---+------------------+
...