org. apache .spark.storage.DiskBlockObjectWriter: Неперехваченное исключение при возврате - PullRequest
0 голосов
/ 04 августа 2020

2020-08-04,12: 21: 55,779 ERROR org. apache .spark.storage.DiskBlockObjectWriter: Неперехваченное исключение при возврате частичной записи в файл / home / work / hdd4 / yarn / zjypr c -hadoop / nodemanager / usercache / h_mifi / appcache / application_1596012257775_905468 / blockmgr-5357b9cf-7f50-49ca-9b59-456879d45344 / 28 / temp_shuffle_a54ed661-e781-4f6c2 * .Index.index.index. .nio.channels.spi.AbstractInterruptibleChannel.end (AbstractInterruptibleChannel. java: 202) в sun.nio.ch.FileChannelImpl.truncate (FileChannelImpl. java: 372) в org. apache .spark.stlockObject. $$ anonfun $ revertPartialWritesAndClose $ 2. примените $ mcV $ sp (DiskBlockObjectWriter. scala: 223) в орг. apache .spark.util.Utils $ .tryWithSafeFinally (Utils. scala: 1390) в орг. apache .spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose (DiskBlockObjectWriter. scala: 219) в организации apache .spark.shuffle.sort.BypassMergeSortShuffleWriter.stop (BypassMergeSortShuffleWriter.stop (BypassMergeSortShuffleWriter.stop (BypassMergeSortSort): * 26 в орг. apache .spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask. scala: 102) в орг. apache .spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask. scala: 53) в орг. apache .spark.scheduler.Task.run (Task. scala: 109) в организации apache .spark.executor.Executor $ TaskRunner $$ anonfun $ 4.apply (Executor. scala: 366) в организации . apache .spark.util.Utils $ .tryWithSafeFinally (Utils. scala: 1381) в организации apache .spark.executor.Executor $ TaskRunner.run (Executor. scala: 372) в java .util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor. java: 1149) в java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) в java. запустить (Thread. java: 748)

ts = spark.sparkContext.broadcast(transform)
def transform(df):
   cols = df.dtypes
   for col,types in cols:
       if types != "string":
           c_min = df.select(col).rdd.min()[0]
           c_max = df.select(col).rdd.max()[0]
           if types in ['double','float']: 
               if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                   df = df.withColumn(col,df[col].cast("float"))
               else:
                   df = df.withColumn(col,df[col].cast("double"))
           else:
               if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                   df = df.withColumn(col,df[col].cast("tinyint"))
               elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                   df = df.withColumn(col,df[col].cast("smallint"))
               elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                   df = df.withColumn(col,df[col].cast("int"))
               elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                   df = df.withColumn(col,df[col].cast("bigint"))
   return df 
df = ts.value(df)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...