Многопроцессорная обработка в python, приводящая к бесконечному выполнению - PullRequest
0 голосов
/ 01 января 2019

Я вижу странное поведение при выполнении моей программы.Позволь мне объяснить.

Я написал многопроцессорный класс.который идет так.

from multiprocessing import Process


class ProcessManager:

    def __init__(self, spark, logger):
        self.spark = spark
        self.logger = logger

    def applyMultiProcessExecution(self, func_arguments, targetFunction, iterableList):

        self.logger.info("Function Arguments : {}".format(func_arguments))
        jobs = []
        for x in iterableList:
            try:
                p = Process(target=targetFunction, args=(x,), kwargs=func_arguments)
                jobs.append(p)
                p.start()
            except:
                raise RuntimeError("Unable to create process for GL : {}".format(x))

        for job in jobs:
            job.join()

Теперь у меня есть метод детекции, который выглядит следующим образом

def detect(self, gl, inputFolder, modelFolder, outputFolder, readWriteUtils, region):

     # This reads data from inputFolder, modelFolder using readWriteUtils based on gl and region
     # Does computation over data
     # Writes data to outputFolder

Теперь я называю этот метод следующим образом.

pm = ProcessManager(spark=spark, logger=logger)
    pm.applyMultiProcessExecution(func_arguments=arguments,
                                  targetFunction= detect,
                                  iterableList=GL_LIST)

Эта операция запускается через кластер EMR с использованием шага spark-submit.

Теперь странное поведение.Иногда это выполняется идеально в течение 1 минуты.Иногда, это входит в бесконечную обработку, и когда я отменяю операцию, используя CTRL C, я вижу, что данные вычисляются, но сам процесс не закрывается.

Над моей искрой я вижу, как логи моего контроллера выглядят так.

2019-01-01T08:22:18.145Z INFO Ensure step 23 jar file command-runner.jar
2019-01-01T08:22:18.145Z INFO StepRunner: Created Runner for step 23
INFO startExec 'hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --py-files /mnt/road-artifacts/ROAD.zip /mnt/road-artifacts/com/amazon/road/model-executor/PCAModelTestExecution.py --inputFolder=/tmp/split_data --modelFolder=/tmp/model --outputFolder=/tmp/output --region=NA'
INFO Environment:
  PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
  LESS_TERMCAP_md=[01;38;5;208m
  LESS_TERMCAP_me=[0m
  HISTCONTROL=ignoredups
  LESS_TERMCAP_mb=[01;31m
  AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
  UPSTART_JOB=rc
  LESS_TERMCAP_se=[0m
  HISTSIZE=1000
  HADOOP_ROOT_LOGGER=INFO,DRFA
  JAVA_HOME=/etc/alternatives/jre
  AWS_DEFAULT_REGION=us-east-1
  AWS_ELB_HOME=/opt/aws/apitools/elb
  LESS_TERMCAP_us=[04;38;5;111m
  EC2_HOME=/opt/aws/apitools/ec2
  TERM=linux
  runlevel=3
  LANG=en_US.UTF-8
  AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
  MAIL=/var/spool/mail/hadoop
  LESS_TERMCAP_ue=[0m
  LOGNAME=hadoop
  PWD=/
  LANGSH_SOURCED=1
  HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-3INAXV6LAS9A4/tmp
  _=/etc/alternatives/jre/bin/java
  CONSOLETYPE=serial
  RUNLEVEL=3
  LESSOPEN=||/usr/bin/lesspipe.sh %s
  previous=N
  UPSTART_EVENTS=runlevel
  AWS_PATH=/opt/aws
  USER=hadoop
  UPSTART_INSTANCE=
  PREVLEVEL=N
  HADOOP_LOGFILE=syslog
  HOSTNAME=ip-172-32-0-233
  HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-3INAXV6LAS9A4
  EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
  SHLVL=5
  HOME=/home/hadoop
  HADOOP_IDENT_STRING=hadoop
INFO redirectOutput to /mnt/var/log/hadoop/steps/s-3INAXV6LAS9A4/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-3INAXV6LAS9A4/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-3INAXV6LAS9A4
INFO ProcessRunner started child process 36375 :
hadoop    36375   6380  0 08:22 ?        00:00:00 /etc/alternatives/jre/bin/java -Xmx1000m -server -XX:OnOutOfMemoryError=kill -9 %p -Dhadoop.log.dir=/mnt/var/log/hadoop/steps/s-3INAXV6LAS9A4 -Dhadoop.log.file=syslog -Dhadoop.home.dir=/usr/lib/hadoop -Dhadoop.id.str=hadoop -Dhadoop.root.logger=INFO,DRFA -Djava.library.path=:/usr/lib/hadoop-lzo/lib/native:/usr/lib/hadoop/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true -Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-3INAXV6LAS9A4/tmp -Dhadoop.security.logger=INFO,NullAppender -Dsun.net.inetaddr.ttl=30 org.apache.hadoop.util.RunJar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --py-files /mnt/road-artifacts/ROAD.zip /mnt/road-artifacts/com/amazon/road/model-executor/PCAModelTestExecution.py --inputFolder=/tmp/split_data --modelFolder=/tmp/model --outputFolder=/tmp/output --region=NA
2019-01-01T08:22:22.152Z INFO HadoopJarStepRunner.Runner: startRun() called for s-3INAXV6LAS9A4 Child Pid: 36375
INFO Synchronously wait child process to complete : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
INFO Process still running
INFO Process still running
INFO Process still running

Я прочитал что-то, связанное с взаимоблокировкой, в многопроцессорной очереди, но это здесь не применимо, поскольку я не помещаю в очередь ничего, что нужно получить.Это выглядит довольно странно для меня, так как я не могу это объяснить.Кто-нибудь может что-то предложить?

...