Pandas_UDF Конкатенация l oop с iloc - PullRequest
0 голосов
/ 09 июля 2020

Я относительно новичок в Pandas и недавно застрял с функцией, определяемой пользователем.

Мой набор данных похож на:

|header|planned|
|  a   |   1   |
|  a   |   2   |
|  a   |   3   |
|  a   |   4   |
|  a   |   5   |
|  b   |   1   |
|  b   |   2   |
|  b   |   3   |
|  b   |   4   |
|  b   |   5   |

Мне нужно объединить значения в столбце planned группами из двух строк, чтобы получить что-то вроде этого:

|header|planned|p_cat|
|  a   |   1   | 1_2 | 
|  a   |   2   | 2_3 |
|  a   |   3   | 3_4 |
|  a   |   4   | 4_5 |
|  a   |   5   |     |
|  b   |   1   | 1_2 |
|  b   |   2   | 2_3 |
|  b   |   3   | 3_4 |
|  b   |   4   | 4_5 |
|  b   |   5   |     |

Числа в столбце planned не указаны в этом c порядке, но всегда будут целыми.

Мой UDF:

schema = ds_adh.schema

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def concat_operations(ds_op):

        s = ds_op['planned']

        for index in range(ds_op['planned'].count()-1):
        
            # clearly working only for the last index
            couple = str([s.iloc[index]]) + '_' + str([s.iloc[index+1]])

        ds_op_new = ds_op

        ds_op_new ['p_cat'] = couple

        return ds_op_new

ds_adh = ds_adh.orderBy("time")
ds_adh = ds_adh.groupBy("header").apply(concat_operations)

Мои проблемы:

  • Сама конкатенация не работает
  • Я не знаю, как сделать sh конкатенацию в couple для всех итераций l oop

Я также безуспешно пытался с pandaSeries.

Вот ошибка, которую я получаю с этим кодом:

IndexError: single positional indexer is out-of-bounds

Ответы [ 2 ]

2 голосов
/ 09 июля 2020

Если это актуальная проблема, вы можете использовать встроенные команды pyspark, как показано ниже:

import pyspark.sql.functions as F
w = Window.partitionBy("header").orderBy("idx")

(df.withColumn("idx",F.monotonically_increasing_id())
   .withColumn("Lead",F.lead("planned").over(w))
   .withColumn("p_cat",F.when(F.col("Lead").isNull(),'')
   .otherwise(F.concat_ws("_","planned","Lead")))
   .orderBy("idx").drop("idx","Lead")).show()
+------+-------+-----+
|header|planned|p_cat|
+------+-------+-----+
|     a|      1|  1_2|
|     a|      2|  2_3|
|     a|      3|  3_4|
|     a|      4|  4_5|
|     a|      5|     |
|     b|      1|  1_2|
|     b|      2|  2_3|
|     b|      3|  3_4|
|     b|      4|  4_5|
|     b|      5|     |
+------+-------+-----+
1 голос
/ 09 июля 2020

Использовать встроенное окно lead функцию с partitionBy в заголовке и orderBy в запланированном столбце, As udf снизит производительность.

from pyspark.sql import *
from pyspark.sql.functions import *
w=Window.partitionBy("header").orderBy("planned")
df.withColumn("p_cat", when(lead(col("planned"),1).over(w).isNull(),lit("")).otherwise(concat_ws("_",col("planned"),lead(col("planned"),1).over(w)))).show()
#+------+-------+-----+
#|header|planned|p_cat|
#+------+-------+-----+
#|     a|      1|  1_2|
#|     a|      2|  2_3|
#|     a|      3|  3_4|
#|     a|      4|  4_5|
#|     a|      5|     |
#|     b|      1|  1_2|
#|     b|      2|  2_3|
#|     b|      3|  3_4|
#|     b|      4|  4_5|
#|     b|      5|     |
#+------+-------+-----+
...