передача данных из stdin и stderr в stdin другого процесса - PullRequest
1 голос
/ 01 февраля 2020

Это действительно следствие из:

Dart: Как передавать данные от одного процесса другому через потоки

Я использую dart для порождения двух процессов .

Давайте назовем эти два процесса "lhs" и "rhs". (lhs - левая сторона и rhs - правая сторона).

Первый процесс (lhs) записывает в stdout и stderr. Мне нужно передать все данные из первого процесса (lhs) в stdin второго процесса (rhs).

В вышеупомянутом переполнении стека ответом было использование метода pipe для потоковой передачи данных из lhs.stdout в rhs.stdin.

Учитывая, что теперь я хочу записать данные из обоих потоков lhs (stdout и stderr), метод pipe не работает, поскольку он поддерживает только один поток, и вы можете ' t вызываем pipe дважды в sthin rhs (выдается сообщение о том, что вы не можете дважды вызвать addStream).

Итак, я попробовал следующий код, который, кажется, частично работает, но я вижу только первый символ из потока lhs stderr, а затем все завершается (методы onDone вызываются в обоих потоках lhs).

Некоторые подробности, помогающие понять, что здесь происходит.

Ниже code 'lhs' - это вызов 'dart --version'. Когда dart записывает свою строку версии, он записывает ее в stderr, а в stdout ничего не записывается.

Я использую 'head' в качестве второго процесса - 'rhs'. Его работа состоит в том, чтобы просто получить объединенный вывод stdout и stderr от lhs и распечатать его на консоли.

Вывод из прогона приведенного ниже кода:

lhs exitCode=0
done
listen: stderr
listen: stderr written
done err
import 'dart:async';
import 'dart:cli';
import 'dart:io';

Future<void> main() async {

  var dart = start('dart', ['--version']);
  var head = start('head', ['-n', '5']);

  pipeTo(dart, head);

}

void pipeTo(Future<Process> lhs, Future<Process> rhs) {
  var complete = Completer<void>();

  // wait for the lhs and rhs processes to
  // start and then start piping lhs
  // output to the rhs input.
  lhs.then((lhsProcess) {
    rhs.then((rhsProcess) {
      // write stdout from lhs to stdin of rhs
      lhsProcess.stdout.listen((datum) {
        print('listen');
        rhsProcess.stdin.add(datum);
        print('listen written');
      }
      , onDone: () {
        print('done');
        complete.complete();
      }
      , onError: (Object e, StackTrace s) =>
              print('onError $e')
      ,cancelOnError: true);

      // write stderr from lhs to stdin of rhs.
      lhsProcess.stderr.listen((datum) {
        print('listen: stderr');
        rhsProcess.stdin.add(datum);
        print('listen: stderr written');
      }
      , onDone: () {
        print('done err');
        if (!complete.isCompleted) complete.complete();
      }
      , onError: (Object e, StackTrace s) =>
              print('onError $e')
      , cancelOnError: true);

      lhsProcess.exitCode.then((exitCode) {
        print('lhs exitCode=$exitCode');
      });

      rhsProcess.exitCode.then((exitCode) {
        print('rhs exitCode=$exitCode');
      });
    });
  });

  waitFor(complete.future);
}

Future<Process> start(String command, List<String> args) async {
  var process = Process.start(
    command,
    args,
  );
  return process;
}


Ответы [ 2 ]

0 голосов
/ 01 февраля 2020
Future<void> main() async {
  var dart = start('cat', ['/var/log/syslog']);
  var head = start('head', ['-n', '5']);

  await pipeTo2(dart, head);

}

Future<void> pipeTo2(Future<Process> lhs, Future<Process> rhs) async {
  // wait for both processes to start
  var lhsProcess = await lhs;
  var rhsProcess = await rhs;

  // send the lhs stdout to the rhs stdin
  lhsProcess.stdout.listen(rhsProcess.stdin.add);

  // send the lhs stderr to the rhs stdin
  lhsProcess.stderr.listen(rhsProcess.stdin.add).onDone(() {
    rhsProcess.stdin.close();
  });

  // send the rhs stdout and stderr to the console
  rhsProcess.stdout.listen(stdout.add);
  rhsProcess.stderr.listen(stderr.add);

  // wait for rhs to finish.
  // if it finishes before the lhs does we will get
  // a broken pipe error which is fine so we can 
  // suppress it.
  await rhsProcess.stdin.done.catchError(
    (Object e) {
      // forget broken pipe after rhs terminates before lhs
    },
    test: (e) => e is SocketException && e.osError.message == 'Broken pipe',
  );
}
0 голосов
/ 01 февраля 2020

Я думаю, что ваше решение использовать listen() для пересылки событий на rhsProcess.stdin будет работать. Вы можете очистить его с помощью await вместо обратных вызовов then() и удалить Completer. Вы можете заставить pipeTo() вернуть Future<void> и, если хотите, использовать waitFor(pipeTo()).

Вот сокращенный пример:

import 'dart:async';
import 'dart:convert';
import 'dart:io';

void main() {
  var dart = Process.start('dart', ['--version']);
  var head = Process.start('head', ['-n', '1']);
  pipeTo(dart, head);
}

Future<void> pipeTo(Future<Process> lhs, Future<Process> rhs) async {
  var lhsProcess = await lhs;
  var rhsProcess = await rhs;
  lhsProcess.stdout.listen(rhsProcess.stdin.add);
  lhsProcess.stderr.listen(rhsProcess.stdin.add);
  rhsProcess.stdout.transform(utf8.decoder).listen(print);
}

Примечание: в противном случае я передаю -n 1 в голову программа никогда не завершается.

...