Я продолжаю сталкиваться с этой ошибкой, когда входные файлы большие.Был бы признателен за любую помощь.
Причина: java.io.IOException: не удалось переименовать файл C: \ spark \ storage_space \ blockmgr-34e6f308-0735-45bd-9cfc-5087accfe917 \ 0c \ shuffle_14_1_0.data.da203675-cf88-4977-81d8-1906072fe655 в C: \ spark \ storage_space \ blockmgr-34e6f308-0735-45bd-9cfc-5087accfe917 \ 0c \ shuffle_14_1_0.data в org.apache.spark.shffffle.verlockShle183) в org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write (BypassMergeSortShuffleWriter.java:164) в org.apache.spark.scheduler.ShuffleMapTask.runTask (Shuffleg..ShuffleMapTask.runTask (ShuffleMapTask.scala: 53) в org.apache.spark.scheduler.Task.run (Task.scala: 108) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 338) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) ... еще 1
19/02/21 17:52:01 ИНФОРМАЦИЯ SparkContext: вызов остановки () из ловушки отключения 19/02/21 17:52:01 ИНФОРМАЦИЯ SparkUI: остановлен веб-интерфейс Spark в http://127.0.0.1:4040 19/ 02/21 17:52:01 ИНФОРМАЦИЯ MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint остановлен!19/02/21 17:52:02 INFO MemoryStore: очистка MemoryStore 19/02/21 17:52:02 INFO BlockManager: BlockManager остановлен 19.02.21 17:52:02 INFO BlockManagerMaster: BlockManagerMaster остановлен 19/02/2117:52:02 INFO OutputCommitCoordinator $ OutputCommitCoordinatorEndpoint: OutputCommitCoordinator остановлен!19/02/21 17:52:02 ИНФОРМАЦИЯ SparkContext: успешно остановлен SparkContext 19/02/21 17:52:02 ИНФОРМАЦИЯ ShutdownHookManager: вызван хук отключения 19/02/21 17:52:02 ИНФОРМАЦИЯ ShutdownHookManager: удаление каталога C: \spark \ storage_space \ spark-af366c79-78e7-4b55-9513-d411cd1d418c 19/02/21 17:52:02 INFO ShutdownHookManager: удаление каталога C: \ spark \ storage_space \ spark-af366c79-78e7-4b55-9513-d411cd1dark18-e4a37af5-7a19-4000-b921-7351285766d2
Это мой код:
import re
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import numpy as np
from copy import deepcopy
conf = SparkConf()
conf.set('spark.local.dir', 'C:/spark/storage_space')
sc = SparkContext(conf=conf)
lines = sc.textFile(sys.argv[1])
MAX_ITER = 40
beta = 0.8
nodes = 100 #to change
edges = lines.map(lambda x: (int(re.split('\s+', x)[0]), int(re.split('\s+', x)[1])))
unique_edges = edges.distinct() #there might be repeated ones
key_edges = unique_edges.map(lambda x: (int(x[0]),1))
out_edges = key_edges.reduceByKey(lambda accum, n: accum + n) #store key as the noe number and value as number of outgoing edges
out_edges_lst = unique_edges.collect()
output_dict = dict((int(key),value) for key,value in out_edges.collect())
m_init = unique_edges.map(lambda x: ((x[0], x[1]),1/output_dict.get(x[0])))
m_ji = m_init.map(lambda x: ((x[0][1], x[0][0]), x[1])) #((destin, source), 1/deg_source) #row,col
def hits():
lambdax = 1
miu =1
L = unique_edges #row, col
#LT = L.map(lambda x: (x[1], x[0])) #col, row
new_h = [1 for i in range(nodes)]
for i in range(MAX_ITER):
###PROCESSING A FIRST ###
LT_h = L.map(lambda x: (x[1], new_h[x[0]-1]*miu)) #h[col-1] #(row, value)
LT_h_add = LT_h.reduceByKey(lambda x, y: x+y)
LT_max = LT_h_add.max(lambda x: x[1])[1] #returns max pair, and get the value
new_a = LT_h_add.map(lambda x: (x[0]-1,float(x[1])/LT_max)).collectAsMap()
print("this is iteration round" + str(i))
print("printing new a")
print(new_a)
#a is stored as a dictionary
##PROCESSING h###
L_a = L.map(lambda x: (x[0], new_a[x[1]-1]*lambdax))
L_a_add = L_a.reduceByKey(lambda x, y: x+y)
L_a_add_max = L_a_add.max(lambda x: x[1])[1]
new_h = L_a_add.map(lambda x: (x[0]-1, float(x[1])/L_a_add_max)).collectAsMap()
print(new_h)
final_a = sorted(new_a.items(), key = lambda x: x[1])[:5]
final_h = sorted(new_h.items(), key = lambda x: x[1])[:5]
print("printing top 5 for H")
print(final_h)