Написать уникальные значения в Spark, сохраняя старые значения - PullRequest
0 голосов
/ 28 января 2019

У меня есть задание Spark, для которого запланировано время.

Когда я записываю DataFrame результата в Цель данных (S3, HDFS, DB ...), яхотите, чтобы то, что пишет Spark, не дублировалось для определенного столбца.

ПРИМЕР:

Допустим, MY_ID - уникальный столбец.

1-е выполнение:

--------------
|MY_ID|MY_VAL|
--------------
|  1  |   5  |
|  2  |   9  |
|  3  |   6  |
--------------

2-е выполнение:

--------------
|MY_ID|MY_VAL|
--------------
|  2  |   9  |
|  3  |   2  |
|  4  |   4  |
--------------

То, что я ожидаю найти в Data Target после двух выполнений, выглядит примерно так:

--------------
|MY_ID|MY_VAL|
--------------
|  1  |   5  |
|  2  |   9  |
|  3  |   6  |
|  4  |   4  |
--------------

Где ожидаемый результат - результат первого выполнения с добавлением результатов второго выполнения.Если значение для MY_ID уже существует, старое сохраняется, отбрасывая результаты новых выполнений (в этом случае 2-е выполнение хочет записать для MY_ID 3 MY_VAL 9. Так как эта запись уже существуетс 1-го выполнения новая запись отбрасывается).

Так что функции distinct() недостаточно, чтобы гарантировать это условие.Уникальность столбца MY_ID должна сохраняться даже в выгруженном выводе.

Есть ли какое-либо решение, которое может гарантировать это свойство при разумных вычислительных затратах?(Это в основном та же идея UNIQUE в реляционных базах данных.)

Ответы [ 2 ]

0 голосов
/ 28 января 2019

Вы можете сделать fullOuterJoin на первой и второй итерации.

val joined = firstIteration.join(secondIteration, Seq("MY_ID"), "fullouter")

scala> joined.show
+-----+------+------+
|MY_ID|MY_VAL|MY_VAL|
+-----+------+------+
|    1|     5|  null|
|    3|     6|     2|
|    4|  null|     4|
|    2|     9|     9|
+-----+------+------+

Если в результирующей таблице значение firstIteration MY_VAL имеет значение, вы можете использовать его как есть.Иначе, если его null (указывает, что ключ встречается только во второй итерации).используйте значение из secondIteration MY_VAL.

scala> joined.withColumn("result", when(firstIteration.col("MY_VAL").isNull, secondIteration.col("MY_VAL"))
        .otherwise(firstIteration.col("MY_VAL")))
       .drop("MY_VAL")
       .show
+-----+------+
|MY_ID|result|
+-----+------+
|    1|     5|
|    3|     6|
|    4|     4|
|    2|     9|
+-----+------+
0 голосов
/ 28 января 2019

Не уверен, используете ли вы Scala или Python, но взгляните на функцию dropDuplicates, которая позволяет указать один или несколько столбцов: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...