pyspark java.io.IOException: не удается переименовать файл - PullRequest
0 голосов
/ 22 февраля 2019

Я продолжаю сталкиваться с этой ошибкой, когда входные файлы большие.Был бы признателен за любую помощь.

Причина: java.io.IOException: не удалось переименовать файл C: \ spark \ storage_space \ blockmgr-34e6f308-0735-45bd-9cfc-5087accfe917 \ 0c \ shuffle_14_1_0.data.da203675-cf88-4977-81d8-1906072fe655 в C: \ spark \ storage_space \ blockmgr-34e6f308-0735-45bd-9cfc-5087accfe917 \ 0c \ shuffle_14_1_0.data в org.apache.spark.shffffle.verlockShle183) в org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write (BypassMergeSortShuffleWriter.java:164) в org.apache.spark.scheduler.ShuffleMapTask.runTask (Shuffleg..ShuffleMapTask.runTask (ShuffleMapTask.scala: 53) в org.apache.spark.scheduler.Task.run (Task.scala: 108) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 338) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) ... еще 1

19/02/21 17:52:01 ИНФОРМАЦИЯ SparkContext: вызов остановки () из ловушки отключения 19/02/21 17:52:01 ИНФОРМАЦИЯ SparkUI: остановлен веб-интерфейс Spark в http://127.0.0.1:4040 19/ 02/21 17:52:01 ИНФОРМАЦИЯ MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint остановлен!19/02/21 17:52:02 INFO MemoryStore: очистка MemoryStore 19/02/21 17:52:02 INFO BlockManager: BlockManager остановлен 19.02.21 17:52:02 INFO BlockManagerMaster: BlockManagerMaster остановлен 19/02/2117:52:02 INFO OutputCommitCoordinator $ OutputCommitCoordinatorEndpoint: OutputCommitCoordinator остановлен!19/02/21 17:52:02 ИНФОРМАЦИЯ SparkContext: успешно остановлен SparkContext 19/02/21 17:52:02 ИНФОРМАЦИЯ ShutdownHookManager: вызван хук отключения 19/02/21 17:52:02 ИНФОРМАЦИЯ ShutdownHookManager: удаление каталога C: \spark \ storage_space \ spark-af366c79-78e7-4b55-9513-d411cd1d418c 19/02/21 17:52:02 INFO ShutdownHookManager: удаление каталога C: \ spark \ storage_space \ spark-af366c79-78e7-4b55-9513-d411cd1dark18-e4a37af5-7a19-4000-b921-7351285766d2

Это мой код:

import re
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import numpy as np
from copy import deepcopy

conf = SparkConf()
conf.set('spark.local.dir', 'C:/spark/storage_space')
sc = SparkContext(conf=conf)
lines = sc.textFile(sys.argv[1])
MAX_ITER = 40
beta = 0.8
nodes = 100 #to change


edges = lines.map(lambda x: (int(re.split('\s+', x)[0]), int(re.split('\s+', x)[1])))
unique_edges = edges.distinct() #there might be repeated ones
key_edges = unique_edges.map(lambda x: (int(x[0]),1))
out_edges = key_edges.reduceByKey(lambda accum, n: accum + n) #store key as the noe number and value as number of outgoing edges
out_edges_lst = unique_edges.collect()
output_dict = dict((int(key),value) for key,value in out_edges.collect())
m_init = unique_edges.map(lambda x: ((x[0], x[1]),1/output_dict.get(x[0])))
m_ji = m_init.map(lambda x: ((x[0][1], x[0][0]), x[1])) #((destin, source), 1/deg_source) #row,col

def hits():
    lambdax = 1
    miu =1
    L = unique_edges #row, col
    #LT = L.map(lambda x: (x[1], x[0])) #col, row
    new_h = [1 for i in range(nodes)]
    for i in range(MAX_ITER):
        ###PROCESSING A FIRST ###
        LT_h = L.map(lambda x: (x[1], new_h[x[0]-1]*miu)) #h[col-1] #(row, value)
        LT_h_add = LT_h.reduceByKey(lambda x, y: x+y)
        LT_max = LT_h_add.max(lambda x: x[1])[1] #returns max pair, and get the value
        new_a = LT_h_add.map(lambda x: (x[0]-1,float(x[1])/LT_max)).collectAsMap()
        print("this is iteration round" + str(i))
        print("printing new a")
        print(new_a)
        #a is stored as a dictionary
        ##PROCESSING h###
        L_a = L.map(lambda x: (x[0], new_a[x[1]-1]*lambdax))
        L_a_add = L_a.reduceByKey(lambda x, y: x+y)
        L_a_add_max = L_a_add.max(lambda x: x[1])[1]
        new_h = L_a_add.map(lambda x: (x[0]-1, float(x[1])/L_a_add_max)).collectAsMap()
        print(new_h)
    final_a = sorted(new_a.items(), key = lambda x: x[1])[:5]
    final_h = sorted(new_h.items(), key = lambda x: x[1])[:5]
    print("printing top 5 for H")
    print(final_h)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...