Как разбить массив на куски, найти сумму кусков и сохранить выходные данные в виде массива в pyspark - PullRequest
1 голос
/ 05 марта 2020

У меня есть фрейм данных, как показано ниже:

+-----+------------------------+
|Index|   finalArray           |
+-----+------------------------+
|1    |[0, 2, 0, 3, 1, 4, 2, 7]|
|2    |[0, 4, 4, 3, 4, 2, 2, 5]|
+-----+------------------------+

Я хочу разбить массив на куски по 2, а затем найти сумму каждого куска и сохранить результирующий массив в столбце finalArray. Это будет выглядеть ниже:

+-----+---------------------+
|Index|    finalArray       |
+-----+---------------------+
|1    |[2, 3, 5, 9]         |
|2    |[4, 7, 6, 7]         |
+-----+---------------------+

Я могу сделать это, создав UDF, но в поисках лучшего и оптимизированного способа. Предпочтительно, если я могу обработать это, используя withColumn и передавая flagArray, чтобы сделать это без необходимости писать UDF.

@udf(ArrayType(DoubleType()))
def aggregate(finalArray,chunkSize):
   n = int(chunkSize)
   aggsum = []
   final = [finalArray[i * n:(i + 1) * n] for i in range((len(finalArray) + n - 1) // n )]
   for item in final:
      agg = 0
      for j in item:
         agg += j
         aggsum.append(agg)
   return aggsum

Я не могу использовать приведенное ниже выражение в UDF, поэтому я использовал циклы

[sum(finalArray[x:x+2]) for x in range(0, len(finalArray), chunkSize)]

Ответы [ 2 ]

5 голосов
/ 05 марта 2020

Для spark 2.4+ вы можете попробовать sequence + transform :

from pyspark.sql.function import expr

df = spark.createDataFrame([
  (1, [0, 2, 0, 3, 1, 4, 2, 7]),
  (2, [0, 4, 4, 3, 4, 2, 2, 5])
], ["Index", "finalArray"])

df.withColumn("finalArray", expr("""
    transform(
      sequence(0,ceil(size(finalArray)/2)-1), 
      i -> finalArray[2*i] + ifnull(finalArray[2*i+1],0))
 """)).show(truncate=False)
+-----+------------+
|Index|finalArray  |
+-----+------------+
|1    |[2, 3, 5, 9]|
|2    |[4, 7, 6, 7]|
+-----+------------+

Для размера фрагмента любого N используйте aggregate функция для подведения итогов:

N = 3

sql_expr = """
    transform(
      /* create a sequence from 0 to number_of_chunks-1 */
      sequence(0,ceil(size(finalArray)/{0})-1),
      /* iterate the above sequence */
      i -> 
        /* create a sequence from 0 to chunk_size-1 
           calculate the sum of values containing every chunk_size items by their indices
         */
        aggregate(
          sequence(0,{0}-1),
          0L, 
          (acc, y) -> acc + ifnull(finalArray[i*{0}+y],0)
        )
    )
"""
df.withColumn("finalArray", expr(sql_expr.format(N))).show()                                                        
+-----+----------+
|Index|finalArray|
+-----+----------+
|    1| [2, 8, 9]|
|    2| [8, 9, 7]|
+-----+----------+
0 голосов
/ 05 марта 2020

Вот немного другая версия решения @ jx c, использующего функцию slice с функциями transform и aggregate.

Лог c предназначен для каждого элемента массива, который мы проверяем, если его индекс кратен chunk size, и используем slice, чтобы получить подмассив chunk size. С aggregate мы суммируем элементы каждого подмассива. Наконец, используя filter для удаления нулей (соответствует индексам, которые не удовлетворяют i % chunk = 0.

chunk = 2

transform_expr = f"""
filter(transform(finalArray, 
                 (x, i) -> IF (i % {chunk} = 0, 
                               aggregate(slice(finalArray, i+1, {chunk}), 0L, (acc, y) -> acc + y),
                               null
                              )
                ),
      x -> x is not null)
"""

df.withColumn("finalArray", expr(transform_expr)).show()

#+-----+------------+
#|Index|  finalArray|
#+-----+------------+
#|    1|[2, 3, 5, 9]|
#|    2|[4, 7, 6, 7]|
#+-----+------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...