Я транслирую / сбрасываю выходные команды на клиент через события на стороне сервера. Кажется, что он работает нормально во время выполнения, но я изо всех сил пытаюсь получить последовательный тест, который проверяет тело ответа. Этот тест иногда зависает / блокируется, потому что if resp.Body.Bytes() == nil
никогда не бывает ложным. Большую часть времени он проходит.
Я высмеиваю команду os через другой тестовый пример, который записывает жестко запрограммированное значение в стандартный вывод. ( Благодаря этому сообщению в блоге ).
Я думал, что в execute
возникает состояние гонки, когда cmd.Wait()
завершает работу, прежде чем сканер сможет сканировать. Но я блокирую завершение сканера после cmd.Wait()
, чтобы гарантировать, что мы закончили со сканером и что у него есть все буферизованные данные.
Если команда завершается до того, как сканер будет готов , scanner.Scan()
возвращает false из-за немедленного достижения конца ввода?
Если я заменю проверку resp.Body
nil
на time.Sleep
, она, кажется, проходит каждый раз, так что я думаю что execute
работает нормально и что тест не идеален. Но я не уверен.
У меня есть cmd.Wait()
до <-scannerDone
, потому что с командой связана отмена контекста. Я хочу поймать отмену немедленно. Если я поменяю их местами, клиент, похоже, получит все буферизованные данные вместо того, чтобы выходной канал немедленно закрылся.
package playground
import (
"bufio"
"fmt"
"net/http"
"os/exec"
)
type SSE struct {
cmdCh chan *exec.Cmd
}
func (s *SSE) HandleSSE(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
for {
select {
case cmd := <-s.cmdCh:
output := make(chan []byte)
go execute(cmd, output)
for data := range output {
log.Println(string(data))
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush()
}
case <-r.Context().Done():
return
}
}
}
func execute(cmd *exec.Cmd, output chan []byte) {
defer close(output)
cmdReader, err := cmd.StdoutPipe()
if err != nil {
output <- []byte(fmt.Sprintf("Error getting stdout pipe: %v", err))
return
}
cmd.Stderr = cmd.Stdout
scanner := bufio.NewScanner(cmdReader)
scannerDone := make(chan struct{})
go func() {
for scanner.Scan() {
log.Println(string(scanner.Bytes())
output <- scanner.Bytes()
}
scannerDone <- struct{}{}
}()
err = cmd.Start()
if err != nil {
output <- []byte(fmt.Sprintf("Error executing: %v", err))
return
}
err = cmd.Wait()
if err != nil {
output <- []byte(err.Error())
}
// Make sure scanner is done before closing output
<-scannerDone
}
package playground
import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"testing"
)
func Test_execute(t *testing.T) {
svr := &SSE{
cmdCh: make(chan *exec.Cmd),
}
req, err := http.NewRequest("GET", "/sse", nil)
if err != nil {
t.Fatal(err)
}
resp := httptest.NewRecorder()
handler := http.HandlerFunc(svr.HandleSSE)
// Run HandleSSE in go routine so it won't block
go func() {
handler.ServeHTTP(resp, req)
}()
svr.cmdCh <- mockExecCommand("echo", "test")
for {
if resp.Body.Bytes() == nil {
continue
}
break
}
data := string(resp.Body.Bytes())
if data != "data: test\n\n" {
t.Errorf("expected data: test\n\n, got %s", data)
}
}
func mockExecCommand(name string, args ...string) *exec.Cmd {
cs := []string{"-test.run=TestHelperProcess", "--", name}
cs = append(cs, args...)
cmd := exec.Command(os.Args[0], cs...)
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
return cmd
}
func TestHelperProcess(t *testing.T) {
if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" {
return
}
fmt.Fprint(os.Stdout, "test")
os.Exit(0)
}