Spark инкрементная загрузка перезаписывает старую запись - PullRequest
0 голосов
/ 03 декабря 2018

У меня есть требование сделать добавочную загрузку в таблицу с помощью Spark (PySpark)

Вот пример:

День 1

id | value
-----------
1  | abc
2  | def

День 2

id | value
-----------
2  | cde
3  | xyz

Ожидаемый результат

id | value
-----------
1  | abc
2  | cde
3  | xyz

Это легко сделать в реляционной базе данных,
Интересно, можно ли это сделать в Spark или другом трансформационном инструменте, например, Presto?

Ответы [ 3 ]

0 голосов
/ 03 декабря 2018

Обходной путь, добавьте столбец даты в кадре данных, затем ранжируйте на основе идентификатора и упорядочите по дате в порядке убывания и получите ранг == 1. Это всегда даст вам самую последнюю запись на основе идентификатора.

df.("rank", rank().over(Window.partitionBy($"id").orderBy($"date".desc)))
  .filter($"rank" === 1)
  .drop($"rank")
  .orderBy($"id")
  .show
0 голосов
/ 03 декабря 2018

Вот, пожалуйста!Первый фрейм данных:

 >>> list1 = [(1, 'abc'),(2,'def')]
 >>> olddf = spark.createDataFrame(list1, ['id', 'value'])
 >>> olddf.show();
 +---+-----+
 | id|value|
 +---+-----+
 |  1|  abc|
 |  2|  def|
 +---+-----+

Второй фрейм данных:

>>> list2 = [(2, 'cde'),(3,'xyz')]
>>> newdf = spark.createDataFrame(list2, ['id', 'value'])
>>> newdf.show();
+---+-----+
| id|value|
+---+-----+
|  2|  cde|
|  3|  xyz|
+---+-----+

теперь объедините и объедините эти два файла данных, используя функцию объединения

from pyspark.sql.functions import *

>>> df = olddf.join(newdf, olddf.id == newdf.id,'full_outer').select(coalesce(olddf.id,newdf.id).alias("id"),coalesce(newdf.value,olddf.value).alias("value"))
>>> df.show();
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  3|  xyz|
|  2|  cde|
+---+-----+

Я надеюсь, что это должно решить вашу проблему,: -)

0 голосов
/ 03 декабря 2018

добавление данных в фрейм осуществляется функцией union в pyspark.Я продемонстрирую пример и создам 2 кадра данных, как вы упомянули в вопросе.

from pyspark.sql.types import Row
df1 = sqlContext.createDataFrame([Row(id=1,value="abc"),Row(id=2,value="def")])

df1.show()
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  def|
+---+-----+

df2 = sqlContext.createDataFrame([Row(id=2,value="cde"),Row(id=3,value="xyz")])
df2.show()
+---+-----+
| id|value|
+---+-----+
|  2|  cde|
|  3|  xyz|
+---+-----+

Давайте сделаем union между двумя кадрами данных, и вы получите желаемый результат.

df2.union(df1).dropDuplicates(["id"]).show()
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  3|  xyz|
|  2|  cde|
+---+-----+

Вы можете отсортировать вывод, используя asc из pyspark.sql.functions

from pyspark.sql.functions import asc


df2.union(df1).dropDuplicates(["id"]).sort(asc("id")).show()
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  cde|
|  3|  xyz|
+---+-----+
...