pySpark - находит общие значения в сгруппированных данных - PullRequest
0 голосов
/ 30 июня 2018

Я пытаюсь найти общие значения среди групп, созданных путем применения groupBy и pivot к фрейму данных в pySpark. Например, данные выглядят так:

+--------+---------+---------+
|PlayerID|PitcherID|ThrowHand|
+--------+---------+---------+
|10000598| 10000104|        R|
|10000908| 10000104|        R|
|10000489| 10000104|        R|
|10000734| 10000104|        R|
|10006568| 10000104|        R|
|10000125| 10000895|        L|
|10000133| 10000895|        L|
|10006354| 10000895|        L|
|10000127| 10000895|        L|
|10000121| 10000895|        L|

После применения:

df.groupBy('PlayerID').pivot('ThrowHand').agg(F.count('ThrowHand')).drop('null').show(10)

Я получаю что-то вроде: -

+--------+----+---+
|PlayerID| L  |  R|
+--------+----+---+
|10000591|  11| 43|
|10000172|  22|101|
|10000989|  05| 19|
|10000454|  05| 17|
|10000723|  11| 33|
|10001989|  11| 38|
|10005243|  20| 60|
|10003366|  11| 26|
|10006058|  02| 09|
+--------+----+---+

есть ли какой-нибудь способ, которым я могу получить общие значения 'PitcherID' среди счетчиков L и R. в приведенном выше.

Что я имею в виду, для PlayerID = 10000591, у меня есть 11 PitcherID, где ThrowHand - L, и 43 PitcherID, где ThrowHand - 43. Возможно, что некоторые Pitchers распространены в этих 11 и 43 Pitchers, сгруппированных.

Можно ли как-нибудь получить эти общие идентификаторы PitcherID?

1 Ответ

0 голосов
/ 30 июня 2018

Сначала вы должны получить коллекцию pitcherIds для каждого броска как

import pyspark.sql.functions as F
#collect set of pitchers in addition to count of ThrowHand
df = df.groupBy('PlayerID').pivot('ThrowHand').agg(F.count('ThrowHand').alias('count'), F.collect_set('PitcherID').alias('PitcherID')).drop('null')

, который должен дать вам dataframe как

root
 |-- PlayerID: string (nullable = true)
 |-- L_count: long (nullable = false)
 |-- L_PitcherID: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- R_count: long (nullable = false)
 |-- R_PitcherID: array (nullable = true)
 |    |-- element: string (containsNull = true)

Затем напишите udf функцию, чтобы получить общие pitcherID s как

#columns with pitcherid and count
pitcherColumns = [x for x in df.columns if 'PitcherID' in x]
countColumns = [x for x in df.columns if 'count' in x]

#udf function to find the common pitcher between the collected pitchers
@F.udf(T.ArrayType(T.StringType()))
def commonFindingUdf(*pitcherCols):
    common = pitcherCols[0]
    for pitcher in pitcherCols[1:]:
        common = set(common).intersection(pitcher)
    return [x for x in common]

#calling the udf function and selecting the required columns
df.select(F.col('PlayerID'), commonFindingUdf(*[col(x) for x in pitcherColumns]).alias('common_PitcherID'), *countColumns)

, который должен дать вам окончательный dataframe как

root
 |-- PlayerID: string (nullable = true)
 |-- common_PitcherID: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- L_count: long (nullable = false)
 |-- R_count: long (nullable = false)

Надеюсь, ответ полезен

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...