группировка по значению ключа pyspark - PullRequest
0 голосов
/ 05 июля 2019

Я пытаюсь сгруппировать значение (ключ, значение) с помощью apache spark (pyspark). Мне удается выполнить группировку по ключу, но внутри я хочу сгруппировать значения, как в следующем примере.

Мне нужно сгруппировать по cout () столбец GYEAR.

%pyspark

rdd1 = sc.textFile("/datos/apat63_99.txt")

rdd2 = rdd1.map(lambda line :  line.split(",") ).map(lambda l : (l[4],l[1],l[0]))

for line in rdd2.take(6):
    print(line)

######################

rdd3 = rdd2.map(lambda line:(line[0],(line[1:]) ))

rddx = rdd2.groupByKey()
rddx.take(5)

Я ожидаю, что результат будет:

IN:

(u'"COUNTRY"', u'"GYEAR"', u'"PATENT"')
(u'"BE"', u'1963', u'3070801')
(u'"BE"', u'1964', u'3070811')
(u'"US"', u'1963', u'3070802')
(u'"US"', u'1963', u'3070803')
(u'"US"', u'1963', u'3070804')
(u'"US"', u'1963', u'3070805')
(u'"US"', u'1964', u'3070807')

OUT: * +1011 *

(u'"BE"', [(u'1963', 1), (u'1964', 1)])
(u'"US"', [(u'1963', 4), (u'1964', 2)])

Ответы [ 3 ]

0 голосов
/ 05 июля 2019

Как указывает @PIG, с DataFrames работать проще, чем с RDD.

Кроме того, я бы рекомендовал использовать create_map, collect_list и ваш собственный UDF для combine_maps. Это должно позволить вам продолжать работать со структурированными данными.

df2=df1.withColumn('result',F.create_map(df1.Gyear, df1.Patient))
df2.show()

+-------+-----+-------+-----------+
|country|Gyear|Patient|     result|
+-------+-----+-------+-----------+
|      S| 1963|      4|[1963 -> 4]|
|     BE| 1963|      1|[1963 -> 1]|
|      S| 1964|      1|[1964 -> 1]|
|     BE| 1964|      1|[1964 -> 1]|
+-------+-----+-------+-----------+

from typing import List, Dict
from pyspark.sql.functions import udf
from functools import reduce
from pyspark.sql.types import *

def combine_map(x: Dict[str, int], y: Dict[str, int]) -> Dict[str, int]:
    return {k: x.get(k, 0) + y.get(k, 0) for k in set(x) | set(y)}


@udf(returnType=MapType(StringType(), IntegerType()))
def combine_maps(maps):
    return reduce(combine_map, maps, {})

df2.groupBy("country").agg(F.collect_list("result").alias("result")) \
   .withColumn("result", combine_maps("result")) \
   .show(truncate=False) 

+-------+----------------------+
|country|result                |
+-------+----------------------+
|S      |[1964 -> 1, 1963 -> 4]|
|BE     |[1964 -> 1, 1963 -> 1]|
+-------+----------------------+
0 голосов
/ 06 июля 2019

Вот один способ с методами RDD:

from operator import add

# initialize the RDD
rdd = sc.parallelize([(u'"COUNTRY"', u'"GYEAR"', u'"PATENT"')
    , (u'"BE"', u'1963', u'3070801')
    , (u'"BE"', u'1964', u'3070811')
    , (u'"US"', u'1963', u'3070802')
    , (u'"US"', u'1963', u'3070803')
    , (u'"US"', u'1963', u'3070804')
    , (u'"US"', u'1963', u'3070805')
    , (u'"US"', u'1964', u'3070807')])

Выполните следующие действия:

  1. установить кортеж (COUNTRY, GYEAR) в качестве ключа, 1 в качестве значения
  2. подсчитать ключи с помощью ReduceByKey (добавить)
  3. установите ключ на COUNTRY, значение на [(GYEAR, cnt)], где cnt рассчитывается на основе предыдущего ReduByKey
  4. запустить reduceByKey(add), чтобы объединить список с тем же ключом (COUNTRY).
  5. использовать фильтр для удаления заголовка

    rdd_new = rdd.map(lambda x: ((x[0],x[1]), 1) ) \
                 .reduceByKey(add) \
                 .map(lambda x: (x[0][0], [(x[0][1],x[1])])) \
                 .reduceByKey(add) \
                 .filter(lambda x: x[0] != '"COUNTRY"')
    

Проверить результат:

>>> rdd_new.take(2)
[(u'"US"', [(u'1964', 1), (u'1963', 4)]),
 (u'"BE"', [(u'1963', 1), (u'1964', 1)])]
0 голосов
/ 05 июля 2019

Это то, что вы ищите? Приведенное ниже решение может быть неэффективным.

Запуск на спарке 2.3 Ubuntu 18.04

    from pyspark.sql import SparkSession
    import pyspark.sql.functions as F

    spark = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate()


    l = [
    ('BE', '1963', '3070801'),
    ('BE', '1964', '3070811'),
    ('S', '1963', '3070802'),
    ('S', '1963', '3070803'),
    ('S', '1963', '3070804'),
    ('S', '1963', '3070805'),
    ('S', '1964', '3070807')]

    colmns = ['country', 'Gyear', 'Patient']

    df=spark.createDataFrame(l, colmns)
    df.show()


+-------+-----+-------+
|country|Gyear|Patient|
+-------+-----+-------+
|     BE| 1963|3070801|
|     BE| 1964|3070811|
|      S| 1963|3070802|
|      S| 1963|3070803|
|      S| 1963|3070804|
|      S| 1963|3070805|
|      S| 1964|3070807|
+-------+-----+-------+

    df1=df.groupBy("country","Gyear").agg(F.count("Patient").alias("Patient"))
    df1.show()

+-------+-----+-------+
|country|Gyear|Patient|
+-------+-----+-------+
|      S| 1963|      4|
|     BE| 1963|      1|
|      S| 1964|      1|
|     BE| 1964|      1|
+-------+-----+-------+
    df2=df1.withColumn('result',F.concat(F.lit('('),df1.Gyear,F.lit(','),df1.Patient,F.lit(')'))).drop("Gyear","Patient")
    df2.show()

+-------+--------+
|country|  result|
+-------+--------+
|      S|(1963,4)|
|     BE|(1963,1)|
|      S|(1964,1)|
|     BE|(1964,1)|
+-------+--------+    

    df2.groupBy("country").agg(F.collect_list("result")).show()

+-------+--------------------+
|country|collect_list(result)|
+-------+--------------------+
|      S|[(1963,4), (1964,1)]|
|     BE|[(1963,1), (1964,1)]|
+-------+--------------------+
...