Как объединить два потока ввода-вывода в Python? - PullRequest
1 голос
/ 28 марта 2019

Я создал оболочку для команды Spark-Submit, чтобы можно было генерировать события в реальном времени путем анализа журналов.Цель состоит в том, чтобы создать интерфейс реального времени, показывающий подробный ход выполнения задания Spark.

Таким образом, оболочка будет выглядеть следующим образом:

  submitter = SparkSubmitter()
  submitter.submit('/path/to/spark-code.py')
  for log_event in submitter:
    if log_event:
      print('Event:', log_event)

И выходные данные будут выглядеть следующим образом:

  Event: StartSparkContextEvent()
  Event: StartWorkEvent()
  Event: FinishWorkEvent()
  Event: StopSparkContextEvent()

Внутри класса SparkSubmitter запускает команду spark-submit в качестве процесса subprocess.Popen, а затем выполняет итерацию по потоку stdout и возвращает события, анализируя журналы, сгенерированные процессом, например:

  class SparkSubmitter():
    def submit(self, path):
        command = self.build_spark_submit_command(path)
      self.process = Popen(command, stdout=PIPE, stderr=PIPE)

    def __iter__(self):
        return self

    def __next__(self):
        # note: this is a IO-Blocking command
        log = self.process.stdout.readline().decode('utf-8') 
      return self.parse_log_and_return_event(log)

Эта реализация хорошо работает с автономным кластером Spark.Но у меня проблема при работе на кластере пряжи.

В кластере пряжи «Журналы, связанные с искрой», приходят в stderr вместо stdout.Так что мой класс не может анализировать журналы, генерируемые искрой, потому что он только пытается прочитать stdout.

Вопрос 1 : можно ли читать stdout и stderr Попена как один поток?

Вопрос 2 : Поскольку оба stdout и stderr обаПотоки, возможно ли объединить оба потока и прочитать их как один?

Вопрос 3 : можно ли перенаправить все журналы только на стандартный вывод?

1 Ответ

2 голосов
/ 28 марта 2019

Ответ на все 3 ваших вопроса - да, вы можете использовать stderr=subprocess.STDOUT в качестве аргумента для Popen, чтобы перенаправить вывод с stderr на stdout:

self.process = Popen(command, stdout=PIPE, stderr=subprocess.STDOUT)
...