Поиск и обновление столбца набора данных Spark со значениями из другого набора данных - PullRequest
0 голосов
/ 24 января 2020

Java 8 и Spark 2.11: 2.3.2 здесь. Хотя я бы предпочел Java ответов API, я немного говорю Scala, поэтому я смогу понять любые ответы, представленные в нем! Но Java если это вообще возможно (пожалуйста)!

У меня есть два набора данных с различной схемой, с исключением из общего столбца "model_number" (строка): который существует на обоих.

Для каждой строки в моем первом наборе данных (назовем это d1) мне нужно отсканировать / найти второй набор данных ("d2"), чтобы увидеть, есть ли строка с тем же model_number, и если да, обновите другой столбец d2.

Вот мои схемы набора данных:

d1
===========
model_number : string
desc : string
fizz : string
buzz : date

d2
===========
model_number : string
price : double
source : string

Итак, еще раз, если строка d1 имеет model_number из, скажем, 12345, и строка d2 также имеет тот же model_number, я хочу обновить d2.price, умножив его на 10.0.

Моя лучшая попытка на данный момент :

// I *think* this would give me a 3rd dataset with all d1 and d2 columns, but only
// containing rows from d1 and d2 that have matching 'model_number' values
Dataset<Row> d3 = d1.join(d2, d1.col("model_number") == d2.col("model_number"));

// now I just need to update d2.price based on matching
Dataset<Row> d4 = d3.withColumn("adjusted_price", d3.col("price") * 10.0);

Может кто-нибудь помочь мне пересечь линию фини sh здесь? Заранее спасибо!

1 Ответ

1 голос
/ 24 января 2020

Некоторые моменты здесь, как @VamsiPrabhala упомянул в комментарии, вам нужно использовать функцию join в указанных вами c полях. Что касается "update", вы должны иметь в виду, что df, ds и rdd в spark являются неизменяемыми, поэтому вы не можете update их. Таким образом, решение здесь заключается в том, что после join ваших df необходимо выполнить вычисления, в данном случае умножения, в select или с использованием withColumn, а затем select. Другими словами, вы не можете обновить столбец, но вы можете создать новый df со столбцом "new".

Пример:

Input data:

+------------+------+------+----+
|model_number|  desc|  fizz|buzz|
+------------+------+------+----+
|     model_a|desc_a|fizz_a|null|
|     model_b|desc_b|fizz_b|null|
+------------+------+------+----+

+------------+-----+--------+
|model_number|price|  source|
+------------+-----+--------+
|     model_a| 10.0|source_a|
|     model_b| 20.0|source_b|
+------------+-----+--------+

с использованием join выведет:

val joinedDF = d1.join(d2, "model_number")
joinedDF.show()

+------------+------+------+----+-----+--------+
|model_number|  desc|  fizz|buzz|price|  source|
+------------+------+------+----+-----+--------+
|     model_a|desc_a|fizz_a|null| 10.0|source_a|
|     model_b|desc_b|fizz_b|null| 20.0|source_b|
+------------+------+------+----+-----+--------+

, применяя ваш расчет:

joinedDF.withColumn("price", col("price") * 10).show()

output:
+------------+------+------+----+-----+--------+
|model_number|  desc|  fizz|buzz|price|  source|
+------------+------+------+----+-----+--------+
|     model_a|desc_a|fizz_a|null| 100.0|source_a|
|     model_b|desc_b|fizz_b|null| 200.0|source_b|
+------------+------+------+----+-----+--------+
...