Добавить статистику из цикла в таблицу Pyspark - PullRequest
0 голосов
/ 28 октября 2018

У меня есть следующий код Pyspark.На каждой итерации цикла я отфильтровываю все строки с определенной строкой в ​​столбце H. Затем я вычисляю некоторые статистические показатели по столбцу G (в результате получается 3 значения).Я хочу сохранить все статистические значения в одной таблице (строки: CM, NCM, FP; столбцы: POP, POP N, POP SN, POP QP).

from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import pyspark.sql.functions as F

spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option", "some-value").getOrCreate()

np.random.seed(0)
rows = 1000
df_pandas = pd.DataFrame({ 'A' : 1.,
   'B' : pd.Timestamp('20130102'),
   'C' : pd.Series(1,index=list(range(rows)),dtype='float32'),
   'D' : np.array([3] * rows,dtype='int32'),
   'E' : pd.Categorical(np.random.choice(["test","train","frog", "chicken"], rows)),
   'F' : 'foo' ,
   'G' : np.random.choice(['CM', 'NCM', 'FP'], rows),
   'H' : np.random.choice(['POP', 'POP N', 'POP SN', 'POP QP'], rows)})                                 

df_spark = spark.createDataFrame(df_pandas)
blocks = ['POP', 'POP N', 'POP SN', 'POP QP']

for block in blocks:
    df_spark_trimmed = df_spark.filter(~F.col('H').isin(block))
    counts = df_spark_trimmed.groupby('G').count()
    counts.show()

1 Ответ

0 голосов
/ 29 октября 2018

Использование join:

import pyspark.sql.functions as F
count_by_g = df_spark.groupBy('G').agg(F.count('*').alias('CountByG'))
count_by_gh = df_spark.groupBy(['G', 'H']).agg(F.count('*').alias('CountByGH'))
count_by_g.join(count_by_gh, ['G']).selectExpr(
    'G', 'H', 'CountByG - CountByGH as count'
).groupBy('G').pivot('H').agg(F.max('count').alias('count')).show()

+---+---+-----+------+------+
|  G|POP|POP N|POP QP|POP SN|
+---+---+-----+------+------+
| CM|256|  260|   245|   250|
|NCM|265|  254|   248|   262|
| FP|246|  236|   239|   239|
+---+---+-----+------+------+

Или другое решение с оконными функциями:

df_spark.groupBy(['G', 'H']).count().selectExpr(
    'G', 'H', 'sum(count) over (partition by G) - count as count'
).groupBy('G').pivot('H').agg(F.max('count').alias('count')).show()
+---+---+-----+------+------+
|  G|POP|POP N|POP QP|POP SN|
+---+---+-----+------+------+
| CM|256|  260|   245|   250|
|NCM|265|  254|   248|   262|
| FP|246|  236|   239|   239|
+---+---+-----+------+------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...