Spark версия-2.3.2
EMR - 5,19,0
8 исполнителей
Каждый исполнитель - 5 основных
Что я пытаюсь сделать: -
1. в s3 есть миллионы уникальных функций (файлов .npy).
2. Используя искру, загрузите все numpy векторы функций.
3. преобразовать их в Spark Dataframes, где схема Dataframe - это имя объекта (строка) и вектор объекта (VectorUDT).
4. Затем найдите LSH, используя искру между кадрами данных.
код: -
import numpy as np
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.sql import SparkSession
from pyspark import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.ml.linalg import VectorUDT
spark = SparkSession \
.builder \
.appName('test-app') \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
features_rdd = sc. \
binaryFiles("s3://input_path/"). \
map(
lambda x: (
x[0].split("/")[-1].split(".")[0],
Vectors.dense(np.fromstring(x[1], dtype='<f4'))
)
)
FEATURES_RAW_SCHEMA = StructType([StructField("image_id", StringType(), True),
StructField("feature", VectorUDT(), True)
])
features_df = sqlContext.createDataFrame(features_rdd, FEATURES_RAW_SCHEMA)
brp = BucketedRandomProjectionLSH(inputCol="feature", outputCol="hashes", seed=12345, numHashTables=30,
bucketLength=1000)
model = brp.fit(features_df)
all_pairs_df = model.approxSimilarityJoin(features_df, features_df, 50.0, distCol="similarity_score").select(
F.col("datasetA.image_id").alias("image_id1"), F.col("datasetB.image_id").alias("image_id2"),
F.col("similarity_score"))
all_pairs_df.write.mode("overwrite").json(
"s3a://output")
Ошибка: -
19/01/10 11:20:11 WARN TaskSetManager: Lost task 120.0 in stage 3.0 (TID 11, ip-ip1.ec2.internal, executor 5): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:252)
at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.write(UnsafeSorterSpillWriter.java:133)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spillIterator(UnsafeExternalSorter.java:498)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:222)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.createWithExistingInMemorySorter(UnsafeExternalSorter.java:111)
at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:156)
at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:248)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.agg_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$2.hasNext(WholeStageCodegenExec.scala:633)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
отправить скрипт: -
sudo -i spark-submit --verbose --deploy-mode cluster --num-executors 11 --executor-cores 5 --executor-memory 8G --driver-cores 5 --driver-memory 8G --conf spark.dynamicAllocation.enabled=false --conf spark.yarn.executor.memoryOverhead=2048 /home/hadoop/filename.py
В некоторых случаях исполнители умирают с сигналом SIGTERM, который я вижу в файлах журнала. Что я тут не так делаю?