Убедитесь, что две команды запущены перед запуском третьей - PullRequest
1 голос
/ 22 сентября 2019

У меня есть три команды для запуска, но я хотел бы убедиться, что две первые запускаются, прежде чем запустить третью.

В настоящее время он запускает A и B, а затем C.

  1. Я запускаю A и B в goroutines
  2. Я передаю их имя через chan, если нет stderr
  3. , функция main выдвигает имена, полученные через chan, всрез
  4. как только срез содержит все имена модулей A и B, он начинается C

Некоторый контекст

Я нахожусь впроцесс изучения горутинов и чан как любителя.Мне не ясно, как вывести exec.Command("foo", "bar").Run() надежным способом, пока он работает.Также не ясно, как обрабатывать ошибки, полученные каждым процессом через канал.

Причина, по которой мне нужно, чтобы A и B запускались до C, заключается в том, что A и B - это микро-сервисы graphql, C нужно, чтобы они выполнялись для того, чтобыполучите их схемы через HTTP и начните выполнять некоторую федерацию graphql (fka graphql stingching)

Несоответствия

  • При моем текущем подходе я буду знать, если A и Bя работаю, только если они что-то печатают.
  • Мне не нравится, что каждый последующий stdout будет попадать в оператор if, просто чтобы узнать, запущен ли процесс.
  • Моя ошибкаобработка не так чиста, как хотелось бы.

Вопрос

Как я мог бы иметь более надежный способ гарантировать, что A и Bработают, если они ничего не печатают и не выдают ошибок?

package main

import (
    "bufio"
    "fmt"
    "log"
    "os/exec"
    "reflect"
    "sort"
    "strings"
    "sync"
)

var wg sync.WaitGroup
var modulesToRun = []string{"micro-post", "micro-hello"}

func main() {
    // Send multiple values to chan
    // https://stackoverflow.com/a/50857250/9077800
    c := make(chan func() (string, error))

    go runModule([]string{"go", "run", "micro-post"}, c)  // PROCESS A
    go runModule([]string{"go", "run", "micro-hello"}, c) // PROCESS B

    modulesRunning := []string{}
    for {
        msg, err := (<-c)()
        if err != nil {
            log.Fatalln(err)
        }

        if strings.HasPrefix(msg, "micro-") && err == nil {
            modulesRunning = append(modulesRunning, msg)
            if CompareUnorderedSlices(modulesToRun, modulesRunning) {
                go runModule([]string{"go", "run", "micro-federation"}, c) // PROCESS C
            }
        }
    }

}

func runModule(commandArgs []string, o chan func() (string, error)) {
    cmd := exec.Command(commandArgs[0], commandArgs[1], commandArgs[2]+"/main.go")

    // Less verbose solution to stream output with io?
    // var stdBuffer bytes.Buffer
    // mw := io.MultiWriter(os.Stdout, &stdBuffer)
    // cmd.Stdout = mw
    // cmd.Stderr = mw

    c := make(chan struct{})
    wg.Add(1)

    // Stream command output
    // https://stackoverflow.com/a/38870609/9077800
    go func(cmd *exec.Cmd, c chan struct{}) {
        defer wg.Done()
        stdout, err := cmd.StdoutPipe()
        if err != nil {
            close(o)
            panic(err)
        }

        stderr, err := cmd.StderrPipe()
        if err != nil {
            close(o)
            panic(err)
        }

        <-c
        outScanner := bufio.NewScanner(stdout)
        for outScanner.Scan() {
            m := outScanner.Text()
            fmt.Println(commandArgs[2]+":", m)
            o <- (func() (string, error) { return commandArgs[2], nil })
        }

        errScanner := bufio.NewScanner(stderr)
        for errScanner.Scan() {
            m := errScanner.Text()
            fmt.Println(commandArgs[2]+":", m)
            o <- (func() (string, error) { return "bad", nil })
        }
    }(cmd, c)

    c <- struct{}{}
    cmd.Start()

    wg.Wait()
    close(o)
}

// CompareUnorderedSlices orders slices before comparing them
func CompareUnorderedSlices(a, b []string) bool {
    if len(a) != len(b) {
        return false
    }

    sort.Strings(a)
    sort.Strings(b)

    return reflect.DeepEqual(a, b)
}

1 Ответ

3 голосов
/ 22 сентября 2019

Об управлении процессом

Запуск процесса - это действие вызова двоичного пути с его аргументами.Он потерпит неудачу, если путь к корзине не найден или предоставлен синтаксис некоторых искаженных аргументов.

Как следствие, вы можете запустить процесс с успехом, но получите ошибку выхода из-за сбоя при выполнении.

Эти детали важно выяснить, нужно ли вам только запустить процесс, чтобы считать операцию успешной или копать дальше ее состояние и / или вывод.

В вашем коде кажется, что вы ждете первогострока stderr, которая будет напечатана, чтобы считать ее начатой, без учета содержимого, которое будет напечатано.

Это больше похоже на время ожидания, чтобы гарантировать инициализацию процесса.

Учтите, что запуск двоичного файла происходит намного быстрее по сравнению с выполнением его последовательности начальной загрузки.

О коде, ваши правила выхода неясны.Что удерживает main от выхода?

В текущем коде он завершится до того, как будет выполнен C, когда начнутся A и B (без каких-либо других случаев)

Ваша реализация параллелизма задания в mainне стандартныйОтсутствует цикл для сбора результатов, выхода и закрытия (chan).

Сигнатура chan неудобна, я бы предпочел использовать struct {Module string, Err error}

Функция runModule содержит ошибки.Он может закрыться (o), а другая подпрограмма может попытаться записать его.Если запуск не удастся, вы не вернете какой-либо сигнал об ошибке.

Некоторое решение может выглядеть следующим образом, рассматривайте его как многовариантное, и в зависимости от бинарного запуска могут / должны быть реализованы другие стратегии для обнаружения ошибок по стандартным FD.

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "os/exec"
    "strings"
    "sync"
    "time"
)

type cmd struct {
    Module string
    Cmd    string
    Args   []string
    Err    error
}

func main() {

    torun := []cmd{
        cmd{
            Module: "A",
            Cmd:    "ping",
            Args:   []string{"8.8.8.8"},
        },
        cmd{
            Module: "B",
            Cmd:    "ping",
            // Args:   []string{"8.8.8.8.9"},
            Args: []string{"8.8.8.8"},
        },
    }

    var wg sync.WaitGroup // use a waitgroup to ensure all concurrent jobs are done
    wg.Add(len(torun))

    out := make(chan cmd) // a channel to output cmd status

    go func() {
        wg.Wait()  //wait for the group to finish
        close(out) //  then close the signal channel
    }()

    // start the commands
    for _, c := range torun {
        // go runCmd(c, out, &wg)
        go runCmdAndWaitForSomeOutput(c, out, &wg)
    }

    // loop over the chan to collect errors
    // it ends when wg.Wait unfreeze and closes out
    for c := range out {
        if c.Err != nil {
            log.Fatalf("%v %v has failed with %v", c.Cmd, c.Args, c.Err)
        }
    }

    // here all commands started you can proceed further to run the last command
    fmt.Println("all done")
    os.Exit(0)
}

func runCmd(o cmd, out chan cmd, wg *sync.WaitGroup) {
    defer wg.Done()

    cmd := exec.Command(o.Cmd, o.Args...)

    if err := cmd.Start(); err != nil {
        o.Err = err // save err
        out <- o    // signal completion error
        return      // return to unfreeze the waitgroup wg
    }
    go cmd.Wait() // dont wait for command completion,
    // consider its done once the program started with success.

    // out <- o // useless as main look ups only for error
}

func runCmdAndWaitForSomeOutput(o cmd, out chan cmd, wg *sync.WaitGroup) {
    defer wg.Done()

    cmd := exec.Command(o.Cmd, o.Args...)

    stdout, err := cmd.StdoutPipe()
    if err != nil {
        o.Err = err // save err
        out <- o    // signal completion
        return      // return to unfreeze the waitgroup wg
    }
    stderr, err := cmd.StderrPipe()
    if err != nil {
        o.Err = err
        out <- o
        return
    }

    if err := cmd.Start(); err != nil {
        o.Err = err
        out <- o
        return
    }

    go cmd.Wait() // dont wait for command completion

    // build a concurrent fd's scanner

    outScan := make(chan error) // to signal errors detected on the fd

    var wg2 sync.WaitGroup
    wg2.Add(2) // the number of fds being watched

    go func() {
        defer wg2.Done()
        sc := bufio.NewScanner(stdout)
        for sc.Scan() {
            line := sc.Text()
            if strings.Contains(line, "icmp_seq") { // the OK marker
                return // quit asap to unfreeze wg2
            } else if strings.Contains(line, "not known") { // the nOK marker, if any...
                outScan <- fmt.Errorf("%v", line)
                return // quit  to unfreeze wg2
            }
        }
    }()

    go func() {
        defer wg2.Done()
        sc := bufio.NewScanner(stderr)
        for sc.Scan() {
            line := sc.Text()
            if strings.Contains(line, "icmp_seq") { // the OK marker
                return // quit asap to unfreeze wg2
            } else if strings.Contains(line, "not known") { // the nOK marker, if any...
                outScan <- fmt.Errorf("%v", line) // signal error
                return                            // quit to unfreeze wg2
            }
        }
    }()

    go func() {
        wg2.Wait() // consider that if the program does not output anything,
        // or never prints ok/nok, this will block forever
        close(outScan) // close the chan so the next loop is finite
    }()

    // - simple timeout less loop
    // for err := range outScan {
    //  if err != nil {
    //      o.Err = err // save the execution error
    //      out <- o // signal the cmd
    //      return // qui to unfreeze the wait group wg
    //  }
    // }

    // - more complex version with timeout
    timeout := time.After(time.Second * 3)
    for {
        select {
        case err, ok := <-outScan:
            if !ok { // if !ok, outScan is closed and we should quit the loop
                return
            }
            if err != nil {
                o.Err = err // save the execution error
                out <- o    // signal the cmd
                return      // quit to unfreeze the wait group wg
            }
        case <-timeout:
            o.Err = fmt.Errorf("timed out...%v", timeout) // save the execution error
            out <- o                                      // signal the cmd
            return                                        // quit to unfreeze the wait group wg
        }
    }

    // exit and unfreeze the wait group wg
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...