Как уменьшитьByKey в PySpark с настраиваемой группировкой строк? - PullRequest
0 голосов
/ 22 мая 2019

У меня есть датафрейм, который выглядит следующим образом:

items_df
======================================================
| customer   item_type    brand    price    quantity |  
|====================================================|
|  1         bread        reems     20         10    |  
|  2         butter       spencers  10         21    |  
|  3         jam          niles     10         22    |
|  1         bread        marks     16         18    |
|  1         butter       jims      19         12    |
|  1         jam          jills     16         6     |
|  2         bread        marks     16         18    |
======================================================

Я создаю rdd, который преобразует вышеприведенное в dict:

rdd = items_df.rdd.map(lambda row: row.asDict())

Результат выглядит так:

[
   { "customer": 1, "item_type": "bread", "brand": "reems", "price": 20, "quantity": 10 },
   { "customer": 2, "item_type": "butter", "brand": "spencers", "price": 10, "quantity": 21 },
   { "customer": 3, "item_type": "jam", "brand": "niles", "price": 10, "quantity": 22 },
   { "customer": 1, "item_type": "bread", "brand": "marks", "price": 16, "quantity": 18 },
   { "customer": 1, "item_type": "butter", "brand": "jims", "price": 19, "quantity": 12 },
   { "customer": 1, "item_type": "jam", "brand": "jills", "price": 16, "quantity": 6 },
   { "customer": 2, "item_type": "bread", "brand": "marks", "price": 16, "quantity": 18 }
]

Я хотел бы сгруппировать вышеупомянутые строки сначала по клиенту. Затем я хотел бы ввести новые пользовательские ключи «хлеб», «масло», «джемы» и сгруппировать все эти строки для этого клиента. Таким образом, мой RDD сокращается с 7 строк до 3 строк.

Вывод будет выглядеть следующим образом:

[
    { 
        "customer": 1, 
        "breads": [
            {"item_type": "bread", "brand": "reems", "price": 20, "quantity": 10},
            {"item_type": "bread", "brand": "marks", "price": 16, "quantity": 18},
        ],
        "butters": [
            {"item_type": "butter", "brand": "jims", "price": 19, "quantity": 12}
        ],
        "jams": [
            {"item_type": "jam", "brand": "jills", "price": 16, "quantity": 6}
        ]
    },
    {
        "customer": 2,
        "breads": [
            {"item_type": "bread", "brand": "marks", "price": 16, "quantity": 18}
        ],
        "butters": [
            {"item_type": "butter", "brand": "spencers", "price": 10, "quantity": 21}
        ],
        "jams": []
    },
    {
        "customer": 3,
        "breads": [],
        "butters": [],
        "jams": [
            {"item_type": "jam", "brand": "niles", "price": 10, "quantity": 22}
        ]
    }
]

Кто-нибудь знает, как достичь вышеуказанного с помощью PySpark? Я хотел бы знать, если есть решение, использующее ReduceByKey () или что-то подобное. Я надеюсь избежать использования groupByKey (), если это возможно.

Ответы [ 2 ]

1 голос
/ 22 мая 2019

Сначала добавьте столбец item_types в сводный кадр данных.

items_df = items_df.withColumn('item_types', F.concat(F.col('item_type'),F.lit('s')))
items_df.show()

+--------+---------+--------+-----+--------+----------+
|customer|item_type|   brand|price|quantity|item_types|
+--------+---------+--------+-----+--------+----------+
|       1|    bread|   reems|   20|      10|    breads|
|       2|   butter|spencers|   10|      21|   butters|
|       3|      jam|   niles|   10|      22|      jams|
|       1|    bread|   marks|   16|      18|    breads|
|       1|   butter|    jims|   19|      12|   butters|
|       1|      jam|   jills|   16|       6|      jams|
|       2|    bread|   marks|   16|      18|    breads|
+--------+---------+--------+-----+--------+----------+

Затем вы можете повернуть таблицу с группой customer и использовать F.collect_list() для агрегирования других столбцов одновременно.

items_df = items_df.groupby(['customer']).pivot("item_types").agg(
    F.collect_list(F.struct(F.col("item_type"),F.col("brand"), F.col("price"),F.col("quantity")))
).sort('customer')
items_df.show()

+--------+--------------------+--------------------+--------------------+
|customer|              breads|             butters|                jams|
+--------+--------------------+--------------------+--------------------+
|       1|[[bread, reems, 2...|[[butter, jims, 1...|[[jam, jills, 16,...|
|       2|[[bread, marks, 1...|[[butter, spencer...|                  []|
|       3|                  []|                  []|[[jam, niles, 10,...|
+--------+--------------------+--------------------+--------------------+

Наконец, вам нужно установить recursive=True, чтобы преобразовать вложенную строку в dict.

rdd = items_df.rdd.map(lambda row: row.asDict(recursive=True))
print(rdd.take(10))


[{'customer': 1,
  'breads': [{'item_type': u'bread', 'brand': u'reems', 'price': 20, 'quantity': 10},
             {'item_type': u'bread', 'brand': u'marks', 'price': 16, 'quantity': 18}],
  'butters': [{'item_type': u'butter', 'brand': u'jims', 'price': 19, 'quantity': 12}],
  'jams': [{'item_type': u'jam', 'brand': u'jills', 'price': 16, 'quantity': 6}]},
 {'customer': 2,
  'breads': [{'item_type': u'bread', 'brand': u'marks', 'price': 16, 'quantity': 18}],
  'butters': [{'item_type': u'butter', 'brand': u'spencers', 'price': 10, 'quantity': 21}],
  'jams': []},
 {'customer': 3,
  'breads': [],
  'butters': [],
  'jams': [{'item_type': u'jam', 'brand': u'niles', 'price': 10, 'quantity': 22}]}]
0 голосов
/ 23 мая 2019

Я также использовал другой подход с использованием reduByKey () в rdd. Учитывая dataframe items_df, сначала преобразуйте его в rdd:

rdd = items_df.rdd.map(lambda row: row.asDict())

Преобразование каждой строки в кортеж (customer, [row_obj]), где у нас есть row_obj в списке:

rdd = rdd.map(lambda row: ( row["customer"], [row] ) )

Группировка по клиенту с использованием reduByKey, где списки объединяются для данного клиента:

rdd = rdd.reduceByKey(lambda x,y: x+y)

Преобразовать кортеж обратно в dict, где ключ - это клиент, а значение - список всех связанных строк:

rdd = rdd.map(lambda tup: { tup[0]: tup[1] } )

Поскольку все данные о клиентах теперь располагаются в ряд, мы можем отделить данные в виде хлебов, масел и джемов с помощью пользовательской функции:

def organize_items_in_customer(row):
    cust_id = list(row.keys())[0]
    items = row[cust_id]
    new_cust_obj = { "customer": cust_id, "breads": [], "butters": [], "jams": [] }
    plurals = { "bread":"breads", "butter":"butters", "jam":"jams" }
    for item in items:
        item_type = item["item_type"]
        key = plurals[item_type]
        new_cust_obj[key].append(item)
    return new_cust_obj

Вызовите вышеуказанную функцию для преобразования rdd:

rdd = rdd.map(organize_items_in_customer)
...