Spark Groupby: разрешить одну запись в более чем двух группах - PullRequest
0 голосов
/ 28 апреля 2018

Предположим, у меня есть СДР, ключи которого имеют вид [1, 2, 3, 4, 5...], теперь я хочу сгруппировать записи по нескольким интервалам для их параллельной обработки, но группы, которые я хочу разделить, пересекаются друг с другом.

Например, я хочу сгруппировать их в следующие группы: [1 ~ 10], [5 ~ 15], [10 ~ 20] ..., таким образом, [1 ~ 10] и [5 ~ 15] для обработки потребуется запись 7.

Как я могу это сделать?

1 Ответ

0 голосов
/ 28 апреля 2018

Может пересчитать ключи?

class Bucket:
    def __init__(self, min_, max_):
        self._min = min_
        self._max = max_

    def __str__(self):
        return "{0}~{1}".format(self._min, self._max)
    def get_bucket_or_none(self, key):
        if self._min <= key <= self._max:
            return str(self)
        else:
            return None


def make_new_key_using_bucket_list(buckets_list, x):
    return_list = []
    for bucket in buckets_list:
        new_key = bucket.get_bucket_or_none(x[0])
        if new_key is not None:
            return_list.append( (new_key, x[1]))

    return return_list


rdd = sc.parallelize([(1, "A"), (5, "B"), (10, "C"), (10, "D"), (5, "E"),
                      (10, "F"), (7, "G"), (14, "H"), (18, "I"), (23, "J")])

buckets_list = [Bucket(1, 10), Bucket(5, 15), Bucket(10, 20)]

rdd_new_keys = rdd.flatMap(lambda x: make_new_key_using_bucket_list(buckets_list, x))

print rdd_new_keys.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

[('10 ~ 20', ['C', 'D', 'F', 'H', 'I']), ('5 ~ 15', ['B', 'C', «D», «E», «F», «G», «H»]), («1 ~ 10», [«A», «B», «C», «D», «E», 'F', 'G'])]

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...