Вложенный для петли распараллеливания в искре - PullRequest
0 голосов
/ 31 марта 2019

Я пытаюсь распараллелить существующий алгоритм в искре (способом, который будет масштабироваться). Я упростил это для целей вопроса, но это выглядит примерно так:

for p in all_p:
    all_q = calculate1(p)

    results_for_p = []
    for q in all_q:
        results_for_p.append(calculate2(q))

    save_results(results_for_p)

По сути, я вложил цикл с двумя долго выполняющимися функциями, которые я хотел бы запустить параллельно. Однако параметры вложенной функции calculate2 имеют переменный размер в зависимости от каждого p.

Моя попытка состояла в том, чтобы сгладить входные данные, чтобы при расчете2 все all_q и all_p работали вместе:

rdd = sc.parallelize(all_p)
all_q_each_p = rdd.map(calculate1).collect()

# flatten output to something we can parallelize:
all_q_all_p = []
for all_q in all_q_each_p:
    all_q_all_p.append(all_q)

rdd = sc.parallelize(all_q_all_p)
res = rdd.map(calculate2).collect()

# How to do this?? 
collect_and_save_all_results(res)

Как написать это так, чтобы оно хорошо масштабировалось?

1 Ответ

0 голосов
/ 06 апреля 2019

Это как раз та проблема, которую решает flatMap.flatMap изменяет размер rdd по умолчанию.

Код становится намного проще:

rdd = sc.parallelize(all_p)

rdd.flatMap(calculate1).map(
    lambda args: calculate2(*args)
).collect()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...