Я изучаю pyspark из программы ранжирования страниц.
Но когда я использую цикл for для вычисления, каждая итерация становится медленнее.
Я пытаюсь использовать кеш, но, похоже, это не работает.
Понятия не имею, как решить эту проблему.
Вот мой код цикла
from time import time
for idx, i in tqdm(enumerate(range(10))):
start_time = time() # <-- start timing
new_values = stochastic_matrix.flatMap(lambda x: get_new_value(x, beta, N))
new_values = new_values.reduceByKey(add).map(lambda x: [x[0], x[1] + ((1-beta)/N)] )
S = new_values.values().reduce(add)
new_stochastic_matrix = stochastic_matrix.fullOuterJoin(new_values)
stochastic_matrix = new_stochastic_matrix.map(lambda x: sum_new_value(x, S, N))
new_stochastic_matrix.cache()
stochastic_matrix.cache() # <--- cache here
end_time = time()
print(idx, end_time - start_time)
sorted(stochastic_matrix.collect())[:10]
Обновление
После того, как я прокомментирую эту строку
stochastic_matrix = new_stochastic_matrix.map(lambda x: sum_new_value(x, S, N))
Это работает нормально !! Но я все еще не знаю, почему и как это исправить.
Обновление 2
Я установил S как постоянную, скорость нормальная.
Но я все еще не знаюпочему и как это исправить.
Весь поток
После входных данных
переменная: stochastic_matrix - структура данных выглядит следующим образом.
[
(key,[value, this_node_connect_to_which_node]),
(1, [0.2, [2, 3]]),
(2, [0.2, [4]]),
(3, [0.2, [1, 4, 5]]),
(4, [0.2, []]),
(5, [0.2, [1, 4]])
]
Карта
def get_new_value(item, beta, N):
key, tmp = item
value, dest = tmp
N_dest = len(dest)
new_values = []
for i in dest:
new_values.append([i, beta * (value/ N_dest)] )
return new_values
new_values = stochastic_matrix.flatMap(lambda x: get_new_value(x, beta, N))
new_values.collect()
########### output
[node, each_node_new_value]
[[2, 0.08000000000000002],
[3, 0.08000000000000002],
[4, 0.16000000000000003],
[1, 0.05333333333333334],
[4, 0.05333333333333334],
[5, 0.05333333333333334],
[1, 0.08000000000000002],
[4, 0.08000000000000002]]
Сокращение по ключу
бета, а N - просто число с плавающей запятой
new_values = new_values.reduceByKey(add).map(lambda x: [x[0], x[1] + ((1-beta)/N)] )
new_values.collect()
###### Output
[[2, 0.12000000000000001],
[3, 0.12000000000000001],
[4, 0.33333333333333337],
[1, 0.17333333333333334],
[5, 0.09333333333333332]]
Объединение new_values и stochastic_matrix
new_stochastic_matrix = stochastic_matrix.fullOuterJoin(new_values)
new_stochastic_matrix.collect()
#### Output
# (key, ([value, this_node_connect_to_which_node], new_value))
[(2, ([0.2, [4]], 0.12000000000000001)),
(4, ([0.2, []], 0.33333333333333337)),
(1, ([0.2, [2, 3]], 0.17333333333333334)),
(3, ([0.2, [1, 4, 5]], 0.12000000000000001)),
(5, ([0.2, [1, 4]], 0.09333333333333332))]
Обновление new_valueдля значения
S и N являются просто числом
def sum_new_value(item, S, N):
key, value = item
if value[1] == None:
new_value = 0 + (1-S)/N
else:
new_value = value[1] + (1-S)/N
# new_value = value[1]
return [key, [new_value, value[0][1]]]
stochastic_matrix = new_stochastic_matrix.map(lambda x: sum_new_value(x, S, N))
sorted(stochastic_matrix.collect())[:10]
######## Output
[[1, [0.2053333333333333, [2, 3]]],
[2, [0.152, [4]]],
[3, [0.152, [1, 4, 5]]],
[4, [0.36533333333333334, []]],
[5, [0.1253333333333333, [1, 4]]]]