почему Python Spark медленный при использовании для цикла - PullRequest
0 голосов
/ 14 октября 2019

Я изучаю pyspark из программы ранжирования страниц.
Но когда я использую цикл for для вычисления, каждая итерация становится медленнее.
Я пытаюсь использовать кеш, но, похоже, это не работает.
Понятия не имею, как решить эту проблему.
performance

Вот мой код цикла

from time import time
for idx, i in tqdm(enumerate(range(10))):

    start_time = time() # <-- start timing

    new_values = stochastic_matrix.flatMap(lambda x: get_new_value(x, beta, N))
    new_values = new_values.reduceByKey(add).map(lambda x: [x[0], x[1] + ((1-beta)/N)] )
    S = new_values.values().reduce(add)
    new_stochastic_matrix = stochastic_matrix.fullOuterJoin(new_values)
    stochastic_matrix = new_stochastic_matrix.map(lambda x: sum_new_value(x, S, N))

    new_stochastic_matrix.cache()
    stochastic_matrix.cache() # <--- cache here


    end_time = time()
    print(idx, end_time - start_time)

sorted(stochastic_matrix.collect())[:10]

Обновление

После того, как я прокомментирую эту строку

stochastic_matrix = new_stochastic_matrix.map(lambda x: sum_new_value(x, S, N))

Это работает нормально !! Но я все еще не знаю, почему и как это исправить.

Обновление 2

Я установил S как постоянную, скорость нормальная.
Но я все еще не знаюпочему и как это исправить.

Весь поток

После входных данных

переменная: stochastic_matrix - структура данных выглядит следующим образом.

[
(key,[value, this_node_connect_to_which_node]),
(1, [0.2, [2, 3]]),
(2, [0.2, [4]]),
(3, [0.2, [1, 4, 5]]),
(4, [0.2, []]),
(5, [0.2, [1, 4]])
]

Карта

def get_new_value(item, beta, N):
    key, tmp = item
    value, dest = tmp
    N_dest = len(dest)

    new_values = []
    for i in dest:
        new_values.append([i, beta * (value/ N_dest)] )

    return new_values

new_values = stochastic_matrix.flatMap(lambda x: get_new_value(x, beta, N))
new_values.collect()

########### output
[node, each_node_new_value]
[[2, 0.08000000000000002],
 [3, 0.08000000000000002],
 [4, 0.16000000000000003],
 [1, 0.05333333333333334],
 [4, 0.05333333333333334],
 [5, 0.05333333333333334],
 [1, 0.08000000000000002],
 [4, 0.08000000000000002]]

Сокращение по ключу

бета, а N - просто число с плавающей запятой

new_values = new_values.reduceByKey(add).map(lambda x: [x[0], x[1] + ((1-beta)/N)] )
new_values.collect()

###### Output
[[2, 0.12000000000000001],
 [3, 0.12000000000000001],
 [4, 0.33333333333333337],
 [1, 0.17333333333333334],
 [5, 0.09333333333333332]]

Объединение new_values ​​и stochastic_matrix

new_stochastic_matrix = stochastic_matrix.fullOuterJoin(new_values)
new_stochastic_matrix.collect()

#### Output
# (key, ([value, this_node_connect_to_which_node], new_value))

[(2, ([0.2, [4]], 0.12000000000000001)),
 (4, ([0.2, []], 0.33333333333333337)),
 (1, ([0.2, [2, 3]], 0.17333333333333334)),
 (3, ([0.2, [1, 4, 5]], 0.12000000000000001)),
 (5, ([0.2, [1, 4]], 0.09333333333333332))]

Обновление new_valueдля значения

S и N являются просто числом

def sum_new_value(item, S, N):
    key, value = item

    if value[1] == None:
        new_value = 0 + (1-S)/N
    else:
        new_value = value[1] + (1-S)/N


#     new_value = value[1]

    return [key, [new_value, value[0][1]]]

stochastic_matrix = new_stochastic_matrix.map(lambda x: sum_new_value(x, S, N))

sorted(stochastic_matrix.collect())[:10]

######## Output
[[1, [0.2053333333333333, [2, 3]]],
 [2, [0.152, [4]]],
 [3, [0.152, [1, 4, 5]]],
 [4, [0.36533333333333334, []]],
 [5, [0.1253333333333333, [1, 4]]]]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...