Спарк трансляция не удалась - PullRequest
0 голосов
/ 11 сентября 2018

Я новичок в поиске и пытаюсь отфильтровать один RDD на основе другого, как описано здесь .

Мои данные фильтра находятся в файле CSV на S3. Этот CSV-файл имеет размер 1,7 ГБ и содержит ~ 100 миллионов строк. Каждая строка имеет уникальный идентификатор длиной 10 символов. Мой план состоит в том, чтобы извлечь эти идентификаторы из этого файла CSV в набор в памяти, затем передать этот набор и использовать его для фильтрации другого СДР.

Мой код выглядит примерно так:

val sparkContext: SparkContext = new SparkContext()

val filterSet = sparkContext
  .textFile("s3://.../filter.csv") // this is the 1.7GB csv file
  .map(_.split(",")(0)) // each string here has exactly 10 chars (A-Z|0-9)
  .collect()
  .toSet // ~100M 10 char long strings in set.

val filterSetBC = sparkContext.broadcast(filterSet) // THIS LINE IS FAILING

val otherRDD = ...

otherRDD
  .filter(item => filterSetBC.value.contains(item.id))
  .saveAsTextFile("s3://...")

Я выполняю этот код в AWS EMR на 10 m4.2xlarge (16 vCore, 32 ГБ памяти) экземплярах EC2 и получаю ошибку ниже.

18/09/06 17:15:33 INFO UnifiedMemoryManager: Will not store broadcast_2 as the required space (16572507620 bytes) exceeds our memory limit (13555256524 bytes)
18/09/06 17:15:33 WARN MemoryStore: Not enough space to cache broadcast_2 in memory! (computed 10.3 GB so far)
18/09/06 17:15:33 INFO MemoryStore: Memory use = 258.6 KB (blocks) + 1024.0 KB (scratch space shared across 1 tasks(s)) = 1282.6 KB. Storage limit = 12.6 GB.
18/09/06 17:15:33 WARN BlockManager: Persisting block broadcast_2 to disk instead.
18/09/06 17:18:54 WARN BlockManager: Putting block broadcast_2 failed due to exception java.lang.ArrayIndexOutOfBoundsException: 1073741865.
18/09/06 17:18:54 WARN BlockManager: Block broadcast_2 could not be removed as it was not found on disk or in memory
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1073741865
    at com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:382)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:65)
    at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:865)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:630)
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:241)
    at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
    at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:174)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$7.apply(BlockManager.scala:1101)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$7.apply(BlockManager.scala:1099)
    at org.apache.spark.storage.DiskStore.put(DiskStore.scala:68)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1099)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:841)
    at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1404)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:123)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1482)

Насколько я понимаю из журналов, набор, который я пытаюсь транслировать, составляет около ~ 15 ГБ. Обычно символы 100Mx10 составляют ~ 1 ГБ, но с некоторыми издержками Java я бы ожидал, что они будут ~ 5-6 ГБ.

Вопрос 1: Почему мои данные так велики? Как я могу минимизировать это?

Тем не менее я настроил своих исполнителей так, чтобы они занимали 22 ГБ (память исполнителя) + 2 ГБ (spark.executor.memoryOverhead).

Вопрос 2: Почему spark сообщает, что он превышает ограничение памяти (12,6 ГБ)? Откуда берется этот предел в 12,6 ГБ?

Наверное, я ужасно напутал с параметрами spark-submit. Вот они:

--deploy-mode cluster 
--class com.example.MySparkJob
--master yarn
--driver-memory 24G
--executor-cores 15
--executor-memory 22G
--num-executors 9
--deploy-mode client
--conf spark.default.parallelism=1200
--conf spark.speculation=true
--conf spark.rdd.compress=true
--conf spark.files.fetchTimeout=180s
--conf spark.network.timeout=300s
--conf spark.yarn.max.executor.failures=5000
--conf spark.dynamicAllocation.enabled=true   // also tried without this parameter, no changes
--conf spark.driver.maxResultSize=0
--conf spark.executor.memoryOverhead=2G
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryo.registrator=com.example.MyKryoRegistrator
--driver-java-options -XX:+UseCompressedOops

1 Ответ

0 голосов
/ 13 февраля 2019

1st Пожалуйста, не назначайте такую ​​огромную память драйвера 4 ГБ достаточно, 2-ой ядро ​​Executor 15 - это путь к огромному 3-4 - достаточно (это даст больше исполнителя вместо нескольких) 3-е, если у вас больше увеличение памятиExecutor от 9 до 45 (если нет, то Executor 18 и Executor men до 16)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...