Как эффективно сопоставить ключи из одного набора данных на основе значений из другого набора данных - PullRequest
0 голосов
/ 23 марта 2020

Предполагая, что кадр данных 1 представляет целевую страну и список стран-источников, а кадр данных 2 представляет доступность для всех стран, найдите все пары из кадра данных 1, где отображение целевой страны равно ИСТИНА и страна-источник отображение FALSE :

Dataframe 1 (targetId, sourceId):
США: Китай, Россия, Индия, Япония
Китай: США, Россия , Индия
Россия: США, Япония

Рамка данных 2 (идентификатор, доступен):
США: правда
Китай: ложь
Россия: правда
Индия: false
Япония: true

Результат Набор данных должен выглядеть следующим образом:
(США, Китай),
(США, Индия)

Моя идея состоит в том, чтобы сначала взорвать набор данных1, создать новый фрейм данных (скажем, tempDF), добавить в него 2 новых столбца: targetAvailable, sourceAvailable и, наконец, отфильтровать для targetAvailable = false и sourceAvailable = true, чтобы получить требуемый фрейм данных результата.

Ниже приведен фрагмент моего кода:

 val sourceDF = sourceData.toDF("targetId", "sourceId")
 val mappingDF = mappingData.toDF("id", "available")
 val tempDF = sourceDF.select(col("targetId"), 
                explode(col("sourceId")).as("source_id_split"))

 val resultDF = tempDF.select("targetId")
         .withColumn("targetAvailable", isAvailable(tempDF.col("targetId")))
         .withColumn("sourceAvailable", isAvailable(tempDF.col("source_id_split")))


 /*resultDF.select("targetId", "sourceId").
  filter(col("targetAvailable") === "true" and col("sourceAvailable") 
  === "false").show()*/


// udf to find the availability value for the given id from the mapping table
val isAvailable = udf((searchId: String) => {
val rows = mappingDF.select("available")
          .filter(col("id") === searchId).collect()

if (rows(0)(0).toString.equals("true")) "true" else "false"  })

Вызов isAvailable UDF при вычислении resultDF вызывает у меня странное исключение. Я делаю что-то неправильно? Есть ли лучший / простой способ сделать это?

1 Ответ

0 голосов
/ 24 марта 2020

В вашем UDF вы ссылаетесь на другой фрейм данных, что невозможно, поэтому вы получаете "странное" исключение.

Вы хотите отфильтровать один фрейм данных на основе значений, содержащихся в другом. Что вам нужно сделать, так это объединить столбцы id. На самом деле в вашем случае два соединения: одно для целей, другое для источников.

Идея использовать explode, однако, очень хороша. Вот способ достичь того, что вы хотите:

// generating data, please provide this code next time ;-)
val sourceDF = Seq("USA" ->  Seq("China", "Russia", "India", "Japan"),
                   "China" -> Seq("USA", "Russia", "India"),
                   "Russia" -> Seq("USA", "Japan"))
               .toDF("targetId", "sourceId")
val mappingDF = Seq("USA" -> true, "China" -> false,
                    "Russia" -> true, "India" -> false,
                    "Japan" -> true)
               .toDF("id", "available")

sourceDF
    // we can filter available targets before exploding.
    // let's do it to be more efficient.
    .join(mappingDF.withColumnRenamed("id", "targetId"), Seq("targetId"))
    .where('available)
    // exploding the sources
    .select('targetId, explode('sourceId) as "sourceId")
    // then we keep only non available sources
    .join(mappingDF.withColumnRenamed("id", "sourceId"), Seq("sourceId"))
    .where(! 'available)
    .select("targetId", "sourceId")
    .show(false)

, который дает

+--------+--------+
|targetId|sourceId|
+--------+--------+
|USA     |China   |
|USA     |India   |
+--------+--------+
...