Объедините две строки в Pyspark, если выполняется условие - PullRequest
0 голосов
/ 21 февраля 2019

У меня есть таблица данных PySpark, которая выглядит следующим образом:

shouldMerge | number
true        | 1
true        | 1
true        | 2
false       | 3
false       | 1 

Я хочу объединить все столбцы с mustMerge как true и сложить числа.

, так что окончательныйвывод будет выглядеть как

shouldMerge | number
true        | 4
false       | 3
false       | 1

Как я могу выбрать все из них с shouldMerge == true, сложить числа и сгенерировать новую строку в PySpark?

Редактировать: чередовать, слегкаболее сложный сценарий ближе к тому, что я пытаюсь решить, где мы собираем только положительные числа:

mergeId     | number
1           | 1
2           | 1
1           | 2
-1          | 3
-1          | 1 

shouldMerge | number
1        | 3
2        | 1
-1       | 3
-1       | 1

Ответы [ 3 ]

0 голосов
/ 21 февраля 2019

IIUC, вы хотите сделать groupBy, но только на положительных mergeId с.

Один из способов - отфильтровать ваш DataFrame по положительным идентификаторам, сгруппировать, агрегировать и объединить его с отрицательными id s (аналогично @ shanmuga ).

Другой способ - использовать when для динамического создания ключа группировки.Если mergeId положительный, используйте mergeId для группировки.В противном случае используйте monotonically_increasing_id, чтобы гарантировать, что строка не агрегируется.

Вот пример:

import pyspark.sql.functions as f

df.withColumn("uid", f.monotonically_increasing_id())\
    .groupBy(
        f.when(
            f.col("mergeId") > 0, 
            f.col("mergeId")
        ).otherwise(f.col("uid")).alias("mergeKey"), 
        f.col("mergeId")
    )\
    .agg(f.sum("number").alias("number"))\
    .drop("mergeKey")\
    .show()
#+-------+------+
#|mergeId|number|
#+-------+------+
#|     -1|   1.0|
#|      1|   3.0|
#|      2|   1.0|
#|     -1|   3.0|
#+-------+------+

Это можно легко обобщить, изменив условие when (в этом случае f.col("mergeId") > 0) соответствует вашим конкретным требованиям.


Объяснение :

Сначала мы создаем временный столбец uid, который является уникальнымID для каждой строки.Затем мы вызываем groupBy и, если mergeId положительный, используйте mergeId для группировки.В противном случае мы используем uid в качестве mergeKey.Я также передал mergeId как вторую группу по столбцу, чтобы сохранить этот столбец для вывода.

Чтобы продемонстрировать, что происходит, взглянем на промежуточный результат:

df.withColumn("uid", f.monotonically_increasing_id())\
    .withColumn(
        "mergeKey",
        f.when(
            f.col("mergeId") > 0, 
            f.col("mergeId")
        ).otherwise(f.col("uid")).alias("mergeKey")
    )\
    .show()
#+-------+------+-----------+-----------+
#|mergeId|number|        uid|   mergeKey|
#+-------+------+-----------+-----------+
#|      1|     1|          0|          1|
#|      2|     1| 8589934592|          2|
#|      1|     2|17179869184|          1|
#|     -1|     3|25769803776|25769803776|
#|     -1|     1|25769803777|25769803777|
#+-------+------+-----------+-----------+

Как видите, mergeKey остается уникальным значением для отрицательных mergeId с.

На этом промежуточном этапе желаемый результат представляет собой просто тривиальную группу по сумме и сумме с последующим удалением столбца mergeKey.

0 голосов
/ 21 февраля 2019

Первая проблема, опубликованная OP.

# Create the DataFrame
valuesCol = [(True,1),(True,1),(True,2),(False,3),(False,1)]
df = sqlContext.createDataFrame(valuesCol,['shouldMerge','number'])
df.show()
+-----------+------+
|shouldMerge|number|
+-----------+------+
|       true|     1|
|       true|     1|
|       true|     2|
|      false|     3|
|      false|     1|
+-----------+------+

# Packages to be imported
from pyspark.sql.window import Window
from pyspark.sql.functions import when, col, lag
# Register the dataframe as a view
df.registerTempTable('table_view')
df=sqlContext.sql(
    'select shouldMerge, number, sum(number) over (partition by shouldMerge) as sum_number from table_view'
)
df = df.withColumn('number',when(col('shouldMerge')==True,col('sum_number')).otherwise(col('number')))
df.show()
+-----------+------+----------+
|shouldMerge|number|sum_number|
+-----------+------+----------+
|       true|     4|         4|
|       true|     4|         4|
|       true|     4|         4|
|      false|     3|         4|
|      false|     1|         4|
+-----------+------+----------+

df = df.drop('sum_number')
my_window = Window.partitionBy().orderBy('shouldMerge')
df = df.withColumn('shouldMerge_lag', lag(col('shouldMerge'),1).over(my_window))
df.show()
+-----------+------+---------------+
|shouldMerge|number|shouldMerge_lag|
+-----------+------+---------------+
|      false|     3|           null|
|      false|     1|          false|
|       true|     4|          false|
|       true|     4|           true|
|       true|     4|           true|
+-----------+------+---------------+

df = df.where(~((col('shouldMerge')==True) & (col('shouldMerge_lag')==True))).drop('shouldMerge_lag')
df.show()
+-----------+------+
|shouldMerge|number|
+-----------+------+
|      false|     3|
|      false|     1|
|       true|     4|
+-----------+------+

Для второй проблемы, опубликованной OP

# Create the DataFrame
valuesCol = [(1,2),(1,1),(2,1),(1,2),(-1,3),(-1,1)]
df = sqlContext.createDataFrame(valuesCol,['mergeId','number'])
df.show()
+-------+------+
|mergeId|number|
+-------+------+
|      1|     2|
|      1|     1|
|      2|     1|
|      1|     2|
|     -1|     3|
|     -1|     1|
+-------+------+

# Packages to be imported
from pyspark.sql.window import Window
from pyspark.sql.functions import when, col, lag
# Register the dataframe as a view
df.registerTempTable('table_view')
df=sqlContext.sql(
    'select mergeId, number, sum(number) over (partition by mergeId) as sum_number from table_view'
)
df = df.withColumn('number',when(col('mergeId') > 0,col('sum_number')).otherwise(col('number')))
df.show()
+-------+------+----------+
|mergeId|number|sum_number|
+-------+------+----------+
|      1|     5|         5|
|      1|     5|         5|
|      1|     5|         5|
|      2|     1|         1|
|     -1|     3|         4|
|     -1|     1|         4|
+-------+------+----------+

df = df.drop('sum_number')
my_window = Window.partitionBy('mergeId').orderBy('mergeId')
df = df.withColumn('mergeId_lag', lag(col('mergeId'),1).over(my_window))
df.show()
+-------+------+-----------+
|mergeId|number|mergeId_lag|
+-------+------+-----------+
|      1|     5|       null|
|      1|     5|          1|
|      1|     5|          1|
|      2|     1|       null|
|     -1|     3|       null|
|     -1|     1|         -1|
+-------+------+-----------+

df = df.where(~((col('mergeId') > 0) & (col('mergeId_lag').isNotNull()))).drop('mergeId_lag')
df.show()
+-------+------+
|mergeId|number|
+-------+------+
|      1|     5|
|      2|     1|
|     -1|     3|
|     -1|     1|
+-------+------+

Документация: lag () - Возвращает значение смещения строк перед текущей строкой.

0 голосов
/ 21 февраля 2019

Вам нужно будет отфильтровать только те строки, в которых следует объединить true и агрегировать.затем объедините это со всеми оставшимися строками.

import pyspark.sql.functions as functions
df = sqlContext.createDataFrame([
    (True, 1),
    (True, 1),
    (True, 2),
    (False, 3),
    (False, 1),
], ("shouldMerge", "number"))

false_df = df.filter("shouldMerge = false")
true_df = df.filter("shouldMerge = true")
result = true_df.groupBy("shouldMerge")\
    .agg(functions.sum("number").alias("number"))\
    .unionAll(false_df)




df = sqlContext.createDataFrame([
    (1, 1),
    (2, 1),
    (1, 2),
    (-1, 3),
    (-1, 1),
], ("mergeId", "number"))

merge_condition = df["mergeId"] > -1
remaining = ~merge_condition
grouby_field = "mergeId"

false_df = df.filter(remaining)
true_df = df.filter(merge_condition)
result = true_df.groupBy(grouby_field)\
    .agg(functions.sum("number").alias("number"))\
    .unionAll(false_df)

result.show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...