Приложения для параллельной потоковой искры на YARN EMR - PullRequest
0 голосов
/ 26 октября 2019

Я столкнулся с проблемой при попытке запустить параллельные задания Spark Streaming на EMR. YARN настроен на использование планировщика емкости, настроены 3 очереди A, B, C.

  1. Я отправляю первый поток job A в queue A, и он работает нормально.
  2. Через пару минут я отправляю второй job B в queue B.

Как только я отправляю job B, job A прекращает работу, и на консоли пряжи EMR появляется всплеск ожидающих контейнеров, и job B начинает работать нормально.

Оба приложения отправляются в правильные очереди, как и ожидалось, и работают хорошо в пределах очереди.

Вот моя команда отправки искры.

spark-submit --name "A" \
    --master yarn \
    --queue A \
    --deploy-mode client \
    --executor-cores 2 \
    --driver-memory 2G \
    --executor-memory 1G \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.shuffle.service.enabled=true \
    --conf spark.dynamicAllocation.minExecutors=1 \
    --conf spark.dynamicAllocation.maxExecutors=2 \
    --conf spark.dynamicAllocation.initialExecutors=1 \
    --conf spark.yarn.submit.waitAppCompletion=false 
    --class test test.jar

Ход выполнения запроса потоковой зажигания продолжает обновляться даже после того, как задание A застряло:

{"Event": "org.apache.spark.sql.streaming.StreamingQueryListener $QueryProgressEvent "," progress ": {" id ":" 047d9837-c493-4adf-b8ff-c916ed67461f "," runId ":" f02114ce-30a3-4678-bb48-a32b68d853be "," name ": null," timestamp ":«2019-10-26T07: 03: 08.382Z», «batchId»: 184, «durationMs»: {«triggerExecution»: 0, «getOffset»: 0}, «eventTime»: {}, «stateOperators»: [], "sources": [{"description": "MQTTStreamSource [brokerUrl: tcp: //mqtt-tap.com: 1883, тема: monitor / tenant / + clientId: stage]", "startOffset": "182", "endOffset ":" 182 "," numInputRows ": 0," inputRowsPerSecond ": 0.0," processingRowsPerSecond ":" NaN "}]," sink ": {" description ":" ForeachSink "}}}

Обе работы могут нормально работать в локальном режиме. Я проверил логи менеджера узлов и не смог найти никаких ошибок. И проблема возникает только в режиме клиент / кластер пряжи. Подскажите пожалуйста как отладить эту проблему.

Редактировать: добавлены настройки планировщика.

<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->
<configuration>

  <property>
    <name>yarn.scheduler.capacity.maximum-applications</name>
    <value>10000</value>
    <description>
      Maximum number of applications that can be pending and running.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
    <value>0.5</value>
    <description>
      Maximum percent of resources in the cluster which can be used to run 
      application masters i.e. controls number of concurrent running
      applications.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.resource-calculator</name>
    <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value>
    <description>
      The ResourceCalculator implementation to be used to compare 
      Resources in the scheduler.
      The default i.e. DefaultResourceCalculator only uses Memory while
      DominantResourceCalculator uses dominant-resource to compare 
      multi-dimensional resources such as Memory, CPU etc.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>alpha,beta,default</value>
    <description>
      The queues at the this level (root is the root queue).
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.alpha.capacity</name>
    <value>50</value>
    <description>Default queue target capacity.</description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.beta.capacity</name>
    <value>30</value>
    <description>Default queue target capacity.</description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.default.capacity</name>
    <value>20</value>
    <description>Default queue target capacity.</description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.default.user-limit-factor</name>
    <value>1</value>
    <description>
      Default queue user limit a percentage from 0.0 to 1.0.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.alpha.user-limit-factor</name>
    <value>1</value>
    <description>
      Default queue user limit a percentage from 0.0 to 1.0.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.beta.user-limit-factor</name>
    <value>1</value>
    <description>
      Default queue user limit a percentage from 0.0 to 1.0.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
    <value>20</value>
    <description>
      The maximum capacity of the default queue. 
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.alpha.maximum-capacity</name>
    <value>50</value>
    <description>
      The maximum capacity of the default queue. 
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.beta.maximum-capacity</name>
    <value>30</value>
    <description>
      The maximum capacity of the default queue. 
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.default.state</name>
    <value>RUNNING</value>
    <description>
      The state of the default queue. State can be one of RUNNING or STOPPED.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.alpha.state</name>
    <value>RUNNING</value>
    <description>
      The state of the default queue. State can be one of RUNNING or STOPPED.
    </description>
  </property>

    <property>
    <name>yarn.scheduler.capacity.root.beta.state</name>
    <value>RUNNING</value>
    <description>
      The state of the default queue. State can be one of RUNNING or STOPPED.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.default.acl_submit_applications</name>
    <value>*</value>
    <description>
      The ACL of who can submit jobs to the default queue.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.default.acl_administer_queue</name>
    <value>*</value>
    <description>
      The ACL of who can administer jobs on the default queue.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.node-locality-delay</name>
    <value>40</value>
    <description>
      Number of missed scheduling opportunities after which the CapacityScheduler 
      attempts to schedule rack-local containers. 
      Typically this should be set to number of nodes in the cluster, By default is setting 
      approximately number of nodes in one rack which is 40.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.queue-mappings</name>
    <value></value>
    <description>
      A list of mappings that will be used to assign jobs to queues
      The syntax for this list is [u|g]:[name]:[queue_name][,next mapping]*
      Typically this list will be used to map users to queues,
      for example, u:%user:%user maps all users to queues with the same name
      as the user.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.queue-mappings-override.enable</name>
    <value>false</value>
    <description>
      If a queue mapping is present, will it override the value specified
      by the user? This can be used by administrators to place jobs in queues
      that are different than the one specified by the user.
      The default is false.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments</name>
    <value>1</value>
    <description>
      Controls the number of OFF_SWITCH assignments allowed
      during a node's heartbeat. Increasing this value can improve
      scheduling rate for OFF_SWITCH containers. Lower values reduce
      "clumping" of applications on particular nodes. The default is 1.
      Legal values are 1-MAX_INT. This config is refreshable.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.accessible-node-labels</name>
    <value>*</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.accessible-node-labels.CORE.capacity</name>
    <value>100</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.default.accessible-node-labels</name>
    <value>*</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.default.accessible-node-labels.CORE.capacity</name>
    <value>20</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.alpha.accessible-node-labels.CORE.capacity</name>
    <value>50</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.beta.accessible-node-labels.CORE.capacity</name>
    <value>30</value>
  </property>

</configuration>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...