Spark задача висит в [GC (Ошибка распределения)] - PullRequest
0 голосов
/ 02 октября 2019

РЕДАКТИРОВАТЬ: Примечание: Исполнитель обычно отправляет сообщение [GC (Allocation Failure) ]. Он запускает его, потому что пытается выделить память для исполнителя, но исполнитель заполнен, поэтому он попытается освободить место при загрузке чего-то нового для исполнителя. Если ваш Исполнитель делает это в цикле, это может означать, что вы пытаетесь загрузить в него слишком большого размера.

Я использую Spark 2.2, Scala 2.11 на AWS EMR 5.8.0

Я пытаюсь запустить операцию count на наборе данных, который не завершается. Что расстраивает, так это то, что он висит только на одном конкретном файле. Я запускаю эту работу в другом файле из S3, нет проблем - она ​​полностью завершается. Исходный CSV-файл сам по себе равен @ 18 ГБ, и мы запускаем на нем преобразование, чтобы превратить исходный CSV-файл в столбец структуры, добавив в него один дополнительный столбец.

Основными ведомыми устройствами моей среды являются 8 экземпляров, каждый из которых:

r3.2xlarge
16 vCore, 61 GiB memory, 160 SSD GB storage

Настройки сеанса My Spark:

implicit val spark = SparkSession
      .builder()
      .appName("MyApp")
      .master("yarn")
      .config("spark.speculation","false")
      .config("hive.metastore.uris", s"thrift://$hadoopIP:9083")
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .config("mapreduce.fileoutputcommitter.algorithm.version", "2")
      .config("spark.dynamicAllocation.enabled", false)
      .config("spark.executor.cores", 5)
      .config("spark.executors.memory", "18G")
      .config("spark.yarn.executor.memoryOverhead", "2G")
      .config("spark.driver.memory", "18G")
      .config("spark.executor.instances", 23)
      .config("spark.default.parallelism", 230)
      .config("spark.sql.shuffle.partitions", 230)
      .enableHiveSupport()
      .getOrCreate()

Данные поступают из файла CSV:

val ds = spark.read
          .option("header", "true")
          .option("delimiter", ",")
          .schema(/* 2 cols: [ValidatedNel, and a stuct schema */)
          .csv(sourceFromS3)
          .as(MyCaseClass)

val mappedDs:Dataset[ValidatedNel, MyCaseClass] = ds.map(...)

mappedDs.repartition(230)

val count = mappedDs.count() // never finishes

Как и ожидалось, он раскручивает 230 задач, и229 финиш, кроме одного где-то посередине. См. Ниже - первая задача просто зависает навсегда, средняя задача завершается без проблем (хотя это и странно - размер записи / соотношение очень разные), а остальные 229 задач выглядят точно так же, как завершенная.

Index| ID |Attempt |Status|Locality Level|Executor ID / Host|                       Launch Time          |   Duration   |GC Time|Input Size / Records|Write Time | Shuffle Write Size / Records| Errors
110   117   0   RUNNING     RACK_LOCAL     11 / ip-XXX-XX-X-XX.uswest-2.compute.internal 2019/10/01 20:34:01    1.1 h   43 min     66.2 MB / 2289538                0.0 B / 0   
0     7     0   SUCCESS     PROCESS_LOCAL  9 / ip-XXX-XX-X-XXX.us-west-2.compute.internal 2019/10/01 20:32:10   1.0 s   16 ms      81.2 MB /293        5 ms         59.0 B / 1   <-- this task is odd, but finishes
1     8     0   SUCCESS     RACK_LOCAL      9 / ip-XXX-XX-X-XXX.us-west-2.compute.internal 2019/10/01 20:32:10  2.1 min     16 ms      81.2 MB /2894845        9 s          59.0 B / 1   <- the other tasks are all similar to this one

Проверяя стандартность зависающих заданий, я неоднократно вижу следующее: никогда не кончается:

2019-10-01T21:51:16.055+0000: [GC (Allocation Failure) 2019-10-01T21:51:16.055+0000: [ParNew: 10904K->0K(613440K), 0.0129982 secs]2019-10-01T21:51:16.068+0000: [CMS2019-10-01T21:51:16.099+0000: [CMS-concurrent-mark: 0.031/0.044 secs] [Times: user=0.17 sys=0.00, real=0.04 secs] 
 (concurrent mode failure): 4112635K->2940648K(4900940K), 0.4986233 secs] 4123539K->2940648K(5514380K), [Metaspace: 60372K->60372K(1103872K)], 0.5121869 secs] [Times: user=0.64 sys=0.00, real=0.51 secs] 

Еще одно замечание: прежде чем я вызову счетчик, я вызываю repartition(230) просто приотвызов count на Dataset[T] для обеспечения равного распределения данных

Что здесь происходит?

1 Ответ

1 голос
/ 02 октября 2019

Возможно, это связано с перекосом данных и / или проблемами с анализом данных. Обратите внимание, что проблемный раздел содержит значительно больше записей, чем успешно обработанный:

Input Size /  Records
66.2 MB / 2289538
81.2 MB /293

Я бы проверил, что все файлы разделов имеют примерно одинаковый размер и количество записей. Возможно, разделители строк и / или столбцов отключены либо в проблемных, либо в «хороших» файлах разделов (293 строки кажутся слишком маленькими для файла ~ 80 МБ).

...