Как уменьшить и суммировать сетки с помощью in Scala Spark DF - PullRequest
0 голосов
/ 14 июля 2020

Можно ли уменьшить сетку nxn в Scala Spark DF до общей суммы сетки и создать новую df? Существующий df:

1 1 0 0 0 0 0 0
0 0 0 0 0 0 1 0
0 1 0 0 0 0 0 0
0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0
0 1 0 0 0 0 1 1
0 1 0 0 0 0 1 0
0 0 0 0 1 0 0 0

если n = 4, то можем ли мы взять сетку 4x4 из этого df, просуммировать их?

1 1 0 0 | 0 0 0 0
0 0 0 0 | 0 0 1 0
0 1 0 0 | 0 0 0 0
0 0 0 0 | 0 0 0 0
------------------
0 0 0 0 | 0 0 0 0
0 1 0 0 | 0 0 1 1
0 1 0 0 | 0 0 1 0
0 0 0 0 | 1 0 0 0

и получить этот результат?

3 1
2 4

Ответы [ 2 ]

0 голосов
/ 15 июля 2020

Проверьте код ниже.

scala> df.show(false)
+---+---+---+---+---+---+---+---+
|a  |b  |c  |d  |e  |f  |g  |h  |
+---+---+---+---+---+---+---+---+
|1  |1  |0  |0  |0  |0  |0  |0  |
|0  |0  |0  |0  |0  |0  |1  |0  |
|0  |1  |0  |0  |0  |0  |0  |0  |
|0  |0  |0  |0  |0  |0  |0  |0  |
|0  |0  |0  |0  |0  |0  |0  |0  |
|0  |1  |0  |0  |0  |0  |1  |1  |
|0  |1  |0  |0  |0  |0  |1  |0  |
|0  |0  |0  |0  |1  |0  |0  |0  |
+---+---+---+---+---+---+---+---+
scala> val n = 4

Это разделит или сгруппирует строки на 2, каждая группа имеет 4 строки данных.

scala> val rowExpr = ntile(n/2)
.over(
    Window
    .orderBy(lit(1))
)

Сбор всех значений в массив array.

scala> val aggExpr = df
.columns
.grouped(4)
.toList.map(c => collect_list(array(c.map(col):_*)).as(c.mkString))

Сглаживание массива, удаление 0 значений и получение размера массива.

scala> val selectExpr = df
.columns
.grouped(4)
.toList
.map(c => size(array_remove(flatten(col(c.mkString)),0)).as(c.mkString))

Применение rowExpr & selectExpr

scala> df
.withColumn("row_id",rowExpr)
.groupBy($"row_id")
.agg(aggExpr.head,aggExpr.tail:_*)
.select(selectExpr:_*)
.show(false)

Конечный результат

+----+----+
|abcd|efgh|
+----+----+
|3   |1   |
|2   |4   |
+----+----+
0 голосов
/ 15 июля 2020

Для строк необходимо агрегировать, а для столбцов - суммировать. пример кода для 2x2

import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
#Create test data frame
tst= sqlContext.createDataFrame([(1,1,2,11),(1,3,4,12),(1,5,6,13),(1,7,8,14),(2,9,10,15),(2,11,12,16),(2,13,14,17),(2,13,14,17)],schema=['col1','col2','col3','col4'])
w=Window.orderBy(F.monotonically_increasing_id())
tst1= tst.withColumn("grp",F.ceil(F.row_number().over(w)/2)) # 2 is for this example - change to 4
tst_sum_row = tst1.groupby('grp').agg(*[F.sum(coln).alias(coln) for coln in tst1.columns if 'grp' not in coln])
expr =[sum([F.col(tst.columns[i]),F.col(tst.columns[i+1])]).alias('coln'+str(i)) for i in [x*2 for x in (range(len(tst.columns)/2))]] # The sum used here is python inbuilt sum and not pyspark sum function which is referred as F.sum()
tst_sum_coln = tst_sum_row.select(*expr)

tst.show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   1|   2|  11|
|   1|   3|   4|  12|
|   1|   5|   6|  13|
|   1|   7|   8|  14|
|   2|   9|  10|  15|
|   2|  11|  12|  16|
|   2|  13|  14|  17|
|   2|  13|  14|  17|
+----+----+----+----+

In [21]: tst_sum_coln.show()
+-----+-----+
|coln0|coln2|
+-----+-----+
|    6|   29|
|   14|   41|
|   24|   53|
|   30|   62|
+-----+-----+
...