Распараллелить алгоритм сортировки используя pyspark - PullRequest
0 голосов
/ 08 декабря 2018

Доброе утро, я разработал простой алгоритм сортировки слиянием, с помощью которого я хочу сравнить его производительность, когда он распараллелен и не распараллелен.

Сначала я создаю список чисел для сортировки и проверки того, какдля сортировки списка требуется много времени.

Следующее, что я хочу сделать, это передать список чисел в sc.parallelize() и преобразовать list в RDD, а затем передатьФункция сортировки слиянием в mapPartitions() и затем collect().

import random
import time
from pyspark import SparkContext

def execute_merge_sort(generated_list):
    start_time = time.time()
    sorted_list = merge_sort(generated_list)
    elapsed = time.time() - start_time
    print('Simple merge sort: %f sec' % elapsed)
    return sorted_list


def generate_list(length):
    N = length
    generated_list = [random.random() for num in range(N)]
    return generated_list

def merging(left_side, right_side):
    result = []
    i = j = 0
    while i < len(left_side) and j < len(right_side):
        if left_side[i] <= right_side[j]:
            result.append(left_side[i])
            i += 1
        else:
            result.append(right_side[j])
            j += 1
    if i == len(left_side):
        result.extend(right_side[j:])
    else:
        result.extend(left_side[i:])
    return result


def merge_sort(generated_list):
    if len(generated_list) <= 1:
        return generated_list
    middle_value = len(generated_list) // 2
    sorted_list = merging(merge_sort(generated_list[:middle_value]), merge_sort(generated_list[middle_value:]))
    return sorted_list


def is_sorted(num_array):
    for i in range(1, len(num_array)):
        if num_array[i] < num_array[i - 1]:
            return False
    return True

generate_list = generate_list(500000)

sorted_list = execute_merge_sort(generate_list)

sc = SparkContext()

rdd = sc.parallelize(generate_list).mapPartitions(execute_merge_sort).collect()

Когда я выполняю это sc.parallelize(generate_list).mapPartitions(execute_merge_sort).collect(), я получаю следующую ошибку:

File "<ipython-input-15-1b7974b4fa56>", line 7, in execute_merge_sort
  File "<ipython-input-15-1b7974b4fa56>", line 36, in merge_sort
TypeError: object of type 'itertools.chain' has no len()

Любойпомощь будет оценена.Заранее спасибо.

1 Ответ

0 голосов
/ 13 декабря 2018

Я выяснил, как решить проблему TypeError: 'float' object is not iterable.

. Эту проблему можно решить путем сглаживания данных с использованием flatMap(lambda x: x) и вызова glom(), чтобы обернуть список и сделать его исполняемым с помощьюфункция execute_merge_sort.Выполнив следующую строку, возвращаемый результат представляет собой список, содержащий отсортированные списки.

sc.parallelize(random_list_of_lists).flatMap(lambda x: x).glom().mapPartitions(execute_merge_sort_rdd).collect()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...