как обновить строку на основе другой строки с тем же идентификатором - PullRequest
0 голосов
/ 24 декабря 2018

В кадре данных Spark я хочу обновить значение строки на основе других строк с таким же идентификатором.

Например, у меня есть записи ниже,

id,value
1,10
1,null
1,null
2,20
2,null
2,null

Я хочу получитьрезультат как показано ниже

id,value
1,10
1,10
1,10
2,20
2,20
2,20

Подводя итог, столбец значения может быть пустым в некоторых строках, я хочу обновить их, если есть другая строка с таким же идентификатором, которая имеет допустимое значение., Я могу просто написать предложение об обновлении с помощью inner-join, но я не нашел того же способа в Spark-sql.

update привлекает внутреннее объединение объединить объединения b на a.id = b.id set a.value = b.value (это то, как я делаю это в sql)

Ответы [ 2 ]

0 голосов
/ 24 декабря 2018

Давайте воспользуемся методом SQL для решения этой проблемы -

myValues = [(1,10),(1,None),(1,None),(2,20),(2,None),(2,None)]
df = sqlContext.createDataFrame(myValues,['id','value'])

df.registerTempTable('table_view')
df1=sqlContext.sql(
    'select id, sum(value) over (partition by id) as value from table_view'
)
df1.show()
+---+-----+
| id|value|
+---+-----+
|  1|   10|
|  1|   10|
|  1|   10|
|  2|   20|
|  2|   20|
|  2|   20|
+---+-----+

Предупреждение: Код Thos предполагает, что для любого конкретного id существует только одно значение non-null.Когда мы groupby значения, мы должны использовать функцию aggregation, и я использовал sum.Если для любого id есть 2 non-null значений, то сумма будет суммирована.Если id может иметь несколько значений non-null, то лучше использовать min/max, чтобы мы получили одно из значений, а не sum.

df1=sqlContext.sql(
    'select id, max(value) over (partition by id) as value from table_view'
)
0 голосов
/ 24 декабря 2018

Для этого можно использовать окно (в pyspark):

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# create dataframe
df = sc.parallelize([
    [1,10],
    [1,None],
    [1,None],
    [2,20],
    [2,None],
    [2,None],
]).toDF(('id', 'value'))

window = Window.partitionBy('id').orderBy(F.desc('value'))
df \
    .withColumn('value', F.first('value').over(window)) \
    .show()

Результаты:

+---+-----+
| id|value|
+---+-----+
|  1|   10|
|  1|   10|
|  1|   10|
|  2|   20|
|  2|   20|
|  2|   20|
+---+-----+

Вы можете использовать те же функции в Scala.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...