Я пытаюсь создать программу, которая будет подключаться к нескольким серверам через веб-сокеты горилл.В настоящее время у меня есть программа, которая перебирает список адресов серверов и создает новую программу, которая создаст свой собственный Websocket.conn и будет обрабатывать чтение и запись.
Проблема в том, что каждый раз, когда создается новая процедура, предыдущие процедуры блокируются, и может продолжаться только последняя.Я полагаю, что это потому, что библиотека веб-сокетов gorilla блокирует каждый gorotutine, но я могу ошибаться.
Я попытался установить таймер в итераторе списка серверов, и каждая goroutine будет работать отлично, но затем появится новая goroutineсделан с другим адресом, предыдущая процедура заблокирована.
Соответствующие биты моего кода:
В моем main.go
for _, server := range servers {
go control(ctx, server, port)
}
В control()
func control(ctx context.Context, server, port string) {
url := url.URL{
Scheme: "ws",
Host: server + ":" + port,
Path: "",
}
conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
panic(err)
}
defer conn.Close()
go sendHandler(ctx, conn)
go readHandler(ctx, conn)
}
readHandler(ctx context.Context, conn *websocket.Con) {
for {
_, p, err := conn.ReadMessage(); if err != nil {
panic(err)
}
select {
case <-ctx.Done():
goto TERM
default:
// do nothing
}
}
TERM:
// do termination
}
sendHandler(ctx context.Context, conn *websocket.Con) {
for _, msg := range msges {
err = conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
panic(err)
}
}
<-ctx.Done()
}
Я удалил части, где я добавляю группы ожидания и другие ненужные фрагменты кода.
Итак, я ожидаю, что будет запущено 3n goroutines (где n - количество серверов) безблокировка, но сейчас я вижу только 3 запущенные программы, которые вызываются последней итерацией списка серверов.
Спасибо!
РЕДАКТИРОВАТЬ 14/06/2019:
Я потратил некоторое время на создание небольшого рабочего примера, и в этом примере ошибка не возникала - ни один из потоков не блокировал друг друга.Я до сих пор не уверен, что вызвало это, но вот мой маленький рабочий пример:
main.go
package main
import (
"context"
"fmt"
"os"
"time"
"os/signal"
"syscall"
"sync"
"net/url"
"github.com/gorilla/websocket"
)
func main() {
servers := []string{"5555","5556", "5557"}
comms := make(chan os.Signal, 1)
signal.Notify(comms, os.Interrupt, syscall.SIGTERM)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
for _, server := range servers {
wg.Add(1)
go control(server,
ctx,
&wg)
}
<-comms
cancel()
wg.Wait()
}
func control(server string, ctx context.Context, wg *sync.WaitGroup) {
fmt.Printf("Started control for %s\n", server)
url := url.URL {
Scheme: "ws",
Host: "0.0.0.0" + ":" + server,
Path: "",
}
conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
panic(err)
}
defer conn.Close()
var localwg sync.WaitGroup
localwg.Add(1)
go sendHandler(ctx, conn, &localwg, server)
localwg.Add(1)
go readHandler(ctx, conn, &localwg, server)
<- ctx.Done()
localwg.Wait()
wg.Done()
return
}
func sendHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
for i := 0; i < 50; i++ {
err := conn.WriteMessage(websocket.TextMessage, []byte("ping"))
if err != nil {
panic(err)
}
fmt.Printf("sent msg to %s\n", server)
time.Sleep(1 * time.Second)
}
<- ctx.Done()
wg.Done()
}
func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
for {
select {
case <- ctx.Done():
wg.Done()
return
default:
_, p, err := conn.ReadMessage()
if err != nil {
wg.Done()
fmt.Println("done")
}
fmt.Printf("Got [%s] from %s\n", string(p), server)
}
}
}
Я протестировал его с помощью simple-websocket-server dpallot сервером 5555, 5556 и 5557 соответственно.