Как определить оптимальное количество случайных разделов в Spark - PullRequest
0 голосов
/ 04 апреля 2020

Я выполняю потоковую работу с искровым структурированием (отскакивает каждый день) в EMR. Я получаю ошибку OOM в моем приложении после нескольких часов выполнения и меня убивают. Ниже приведены мои конфигурации и код spark SQL. Я новичок в Spark и мне нужен ваш ценный вклад.

EMR имеет 10 экземпляров с 16 ядрами и 64 ГБ памяти.

Аргументы Spark-Submit:

   num_of_executors: 17
   executor_cores: 5
   executor_memory: 19G
   driver_memory: 30G

Задание читает входные данные в виде микропакетов из Kafka с интервалом в 30 секунд. Среднее число считываемых строк в пакете составляет 90 тыс.

  spark.streaming.kafka.maxRatePerPartition: 4500
  spark.streaming.stopGracefullyOnShutdown: true
  spark.streaming.unpersist: true
  spark.streaming.kafka.consumer.cache.enabled: true
  spark.hadoop.fs.s3.maxRetries: 30 
  spark.sql.shuffle.partitions: 2001

Код агрегации Spark SQL:

dataset.groupBy(functions.col(NAME),functions.window(functions.column(TIMESTAMP_COLUMN),30))
            .agg(functions.concat_ws(SPLIT, functions.collect_list(DEPARTMENT)).as(DEPS))
            .select(NAME,DEPS)
            .map((row) -> {
              Map<String, Object> map = Maps.newHashMap();
              map.put(NAME, row.getString(0));
              map.put(DEPS, row.getString(1));
              return new KryoMapSerializationService().serialize(map);
            }, Encoders.BINARY());

Некоторые журналы из драйвера:

20/04/04 13:10:51 INFO TaskSetManager: Finished task 1911.0 in stage 1041.0 (TID 1052055) in 374 ms on <host> (executor 3) (1998/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1925.0 in stage 1041.0 (TID 1052056) in 411 ms on  <host> (executor 3) (1999/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1906.0 in stage 1041.0 (TID 1052054) in 776 ms on  <host> (executor 3) (2000/2001)
20/04/04 13:11:04 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 3.
20/04/04 13:11:04 INFO DAGScheduler: Executor lost: 3 (epoch 522)
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Trying to remove executor 3 from BlockManagerMaster.
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(3,  <host>, 38533, None)
20/04/04 13:11:04 INFO BlockManagerMaster: Removed 3 successfully in removeExecutor
20/04/04 13:11:04 INFO YarnAllocator: Completed container container_1582797414408_1814_01_000004 on host:  <host> (state: COMPLETE, exit status: 143)

И, кстати, я использую collectasList в своем коде forEachBatch

  List<Event> list = dataset.select("value")
        .selectExpr("deserialize(value) as rows")
        .select("rows.*")
        .selectExpr(NAME, DEPS)
        .as(Encoders.bean(Event.class))
        .collectAsList();

1 Ответ

0 голосов
/ 14 апреля 2020

С этими настройками у вас могут возникнуть собственные проблемы.

   num_of_executors: 17
   executor_cores: 5
   executor_memory: 19G
   driver_memory: 30G

Вы в основном создаете здесь дополнительные контейнеры, чтобы перемещаться между ними. Вместо этого начните с чего-то вроде 10 исполнителей, 15 ядер, 60 г памяти. Если это работает, то вы можете немного поиграть, чтобы попытаться оптимизировать производительность. Я обычно пытаюсь разделить свои контейнеры пополам на каждом шаге (но мне также не нужно было делать это с версии 2.0).

Пусть Spark SQL сохранит значение по умолчанию на уровне 200. Чем больше вы разбиваете это, тем больше математики вы делаете, чтобы Spark вычислял перемешивание. Во всяком случае, я бы попытался go с тем же числом параллелизма, что и у ваших исполнителей, поэтому в данном случае просто 10. Когда вышла версия 2.0, вы бы настроили запросы улья. Создание сложного задания, которое нужно разбить, возлагает всю нагрузку на мастера.

Использование наборов данных и кодирования, как правило, также не так эффективно, как при работе с прямыми операциями DataFrame. Я обнаружил, что производительность этого фактора для операций с кадрами данных значительно увеличилась.

...