В чем разница между partitionBy и groupBy в искре - PullRequest
0 голосов
/ 17 июня 2020

У меня есть pyspark rdd, который можно собирать в виде списка кортежей, как показано ниже:

rdds = self.sc.parallelize([(("good", "spark"), 1), (("sood", "hpark"), 1), (("god", "spak"), 1),
                                (("food", "spark"), 1), (("fggood", "ssspark"), 1), (("xd", "hk"), 1),
                                (("good", "spark"), 7), (("good", "spark"), 3), (("good", "spark"), 4),
                                (("sood", "hpark"), 5), (("sood", "hpark"), 7), (("xd", "hk"), 2),
                                (("xd", "hk"), 1), (("fggood", "ssspark"), 2), (("fggood", "ssspark"), 1)], 6)
rdds.glom().collect()

def inner_map_1(p):
    d = defaultdict(int)
    for row in p:
        d[row[0]] += row[1]
    for item in d.items():
        yield item

rdd2 = rdds.partitionBy(4, partitionFunc=lambda x: hash(x)).mapPartitions(inner_map_1)
print(rdd2.glom().collect())

def inner_map_2(p):
    for row in p:
        item = row[0]
        sums = sum([num for _, num in row[1]])
        yield item, sums
rdd3 = rdds.groupBy(lambda x: x[0]).mapPartitions(inner_map_2)
print(rdd3.glom().collect())

Есть rdd2 и rdd3, которые вычисляются в другой форме, и я получаю тот же результат, но я не уверен, правда ли, что rdd2 и rdd3 дают одинаковый результат, и элементы находятся в одном разделе.

Ответы [ 2 ]

1 голос
/ 17 июня 2020

partitionBy обычно означает, что вы набираете sh ключей раздела и отправляете их в определенный раздел RDD. Это размещает что-либо с совпадающим ключом в одном разделе, что полезно при выполнении соединений, когда вам нужны все совпадающие ключи в одном месте. partitionBy НЕ удаляет какие-либо записи, он только размещает совпадающие ключи.

df.partitionBy ("vendorId") // все строки сохраняются, они теперь размещены в одном разделе rdd

groupBy - это концепция SQL. Если найдет все уникальные комбинации клавиш. Вы также можете выполнять агрегатные функции для всех записей с одним и тем же ключом. например, если вы хотите подсчитать все записи с одним и тем же ключом, вы можете сделать ...

df.groupBy ("vendorId"). count.show

Какой будет считать все записи с одним и тем же идентификатором поставщика. В отличие от partitionBy, groupBy имеет тенденцию значительно сокращать количество записей. (см. мощность)

Я бы предложил запустить df.groupBy ("vendorId"). объяснять (true). это распечатает логический план (подумайте о эквиваленте SQL) и физический план (точный набор операций, которые будет выполнять Spark). В общем случае spark переводит groupBy в частичное ha sh aggregate -> shuffle (разделение по ключу) -> final ha sh aggregate -> результаты

0 голосов
/ 17 июня 2020

Я бы сказал, что groupBy - более логичный способ сгруппировать ваши данные. Похоже, что «groupBy» в SQL.

«PartitionBy» более физический. Вы действительно физически разделяете свои данные в кластере.

...