Ошибка «Нет свободного места на устройстве» и сигнал SIGTERM при использовании Spark LSH в EMR - PullRequest
0 голосов
/ 10 января 2019

Spark версия-2.3.2
EMR - 5,19,0
8 исполнителей
Каждый исполнитель - 5 основных

Что я пытаюсь сделать: -
1. в s3 есть миллионы уникальных функций (файлов .npy).
2. Используя искру, загрузите все numpy векторы функций.
3. преобразовать их в Spark Dataframes, где схема Dataframe - это имя объекта (строка) и вектор объекта (VectorUDT).
4. Затем найдите LSH, используя искру между кадрами данных.

код: -

import numpy as np
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.sql import SparkSession
from pyspark import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.ml.linalg import VectorUDT

spark = SparkSession \
        .builder \
        .appName('test-app') \
        .getOrCreate()

    sc = spark.sparkContext
    sqlContext = SQLContext(sc)

    features_rdd = sc. \
        binaryFiles("s3://input_path/"). \
        map(
        lambda x: (
            x[0].split("/")[-1].split(".")[0],
            Vectors.dense(np.fromstring(x[1], dtype='<f4'))
        )
    )

    FEATURES_RAW_SCHEMA = StructType([StructField("image_id", StringType(), True),
                                      StructField("feature", VectorUDT(), True)
                                      ])

    features_df = sqlContext.createDataFrame(features_rdd, FEATURES_RAW_SCHEMA)
    brp = BucketedRandomProjectionLSH(inputCol="feature", outputCol="hashes", seed=12345, numHashTables=30,
                                      bucketLength=1000)




    model = brp.fit(features_df)

    all_pairs_df = model.approxSimilarityJoin(features_df, features_df, 50.0, distCol="similarity_score").select(
        F.col("datasetA.image_id").alias("image_id1"), F.col("datasetB.image_id").alias("image_id2"),
        F.col("similarity_score"))



    all_pairs_df.write.mode("overwrite").json(
        "s3a://output")

Ошибка: -

19/01/10 11:20:11 WARN TaskSetManager: Lost task 120.0 in stage 3.0 (TID 11, ip-ip1.ec2.internal, executor 5): java.io.IOException: No space left on device
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:326)
    at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
    at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
    at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:252)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.write(UnsafeSorterSpillWriter.java:133)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spillIterator(UnsafeExternalSorter.java:498)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:222)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.createWithExistingInMemorySorter(UnsafeExternalSorter.java:111)
    at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:156)
    at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:248)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.agg_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.agg_doAggregateWithKeys_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$2.hasNext(WholeStageCodegenExec.scala:633)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

отправить скрипт: -

sudo -i spark-submit --verbose --deploy-mode cluster --num-executors 11 --executor-cores 5 --executor-memory 8G --driver-cores 5 --driver-memory 8G --conf spark.dynamicAllocation.enabled=false --conf spark.yarn.executor.memoryOverhead=2048 /home/hadoop/filename.py

В некоторых случаях исполнители умирают с сигналом SIGTERM, который я вижу в файлах журнала. Что я тут не так делаю?

...