Обработка нескольких соединений через веб-сокет - PullRequest
0 голосов
/ 13 июня 2019

Я пытаюсь создать программу, которая будет подключаться к нескольким серверам через веб-сокеты горилл.В настоящее время у меня есть программа, которая перебирает список адресов серверов и создает новую программу, которая создаст свой собственный Websocket.conn и будет обрабатывать чтение и запись.

Проблема в том, что каждый раз, когда создается новая процедура, предыдущие процедуры блокируются, и может продолжаться только последняя.Я полагаю, что это потому, что библиотека веб-сокетов gorilla блокирует каждый gorotutine, но я могу ошибаться.

Я попытался установить таймер в итераторе списка серверов, и каждая goroutine будет работать отлично, но затем появится новая goroutineсделан с другим адресом, предыдущая процедура заблокирована.

Соответствующие биты моего кода:

В моем main.go

for _, server := range servers {
  go control(ctx, server, port)
}

В control()


func control(ctx context.Context, server, port string) { 
  url := url.URL{
    Scheme: "ws",
    Host: server + ":" + port,
    Path: "",
  }
  conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  go sendHandler(ctx, conn)
  go readHandler(ctx, conn)
}

readHandler(ctx context.Context, conn *websocket.Con) {
  for {
    _, p, err := conn.ReadMessage(); if err != nil {
      panic(err)
    }
    select {
      case <-ctx.Done():
        goto TERM
      default:
        // do nothing
    }
  }
  TERM:
  // do termination  
}

sendHandler(ctx context.Context, conn *websocket.Con) {
  for _, msg := range msges {
    err = conn.WriteMessage(websocket.TextMessage, msg)
    if err != nil {
      panic(err)
    }
  }
  <-ctx.Done()
}

Я удалил части, где я добавляю группы ожидания и другие ненужные фрагменты кода.

Итак, я ожидаю, что будет запущено 3n goroutines (где n - количество серверов) безблокировка, но сейчас я вижу только 3 запущенные программы, которые вызываются последней итерацией списка серверов.

Спасибо!

РЕДАКТИРОВАТЬ 14/06/2019:

Я потратил некоторое время на создание небольшого рабочего примера, и в этом примере ошибка не возникала - ни один из потоков не блокировал друг друга.Я до сих пор не уверен, что вызвало это, но вот мой маленький рабочий пример:

main.go

package main

import (
    "context"
    "fmt"
    "os"
    "time"
    "os/signal"
    "syscall"
    "sync"
    "net/url"
    "github.com/gorilla/websocket"
    )

func main() {
    servers := []string{"5555","5556", "5557"}
    comms := make(chan os.Signal, 1)
    signal.Notify(comms, os.Interrupt, syscall.SIGTERM)

    ctx := context.Background()
    ctx, cancel := context.WithCancel(ctx)
    var wg sync.WaitGroup

    for _, server := range servers {
        wg.Add(1)
        go control(server,
                   ctx,
                   &wg)
    }

    <-comms
    cancel()
    wg.Wait()
}

func control(server string, ctx context.Context, wg *sync.WaitGroup) {
    fmt.Printf("Started control for %s\n", server)
    url := url.URL {
        Scheme: "ws",
        Host: "0.0.0.0" + ":" + server,
        Path: "",
    }
    conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    var localwg sync.WaitGroup

    localwg.Add(1)
    go sendHandler(ctx, conn, &localwg, server)
    localwg.Add(1)
    go readHandler(ctx, conn, &localwg, server)

    <- ctx.Done()
    localwg.Wait()
    wg.Done()
    return
}

func sendHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
    for i := 0; i < 50; i++ {
        err := conn.WriteMessage(websocket.TextMessage, []byte("ping"))
        if err != nil {
            panic(err)
        }
        fmt.Printf("sent msg to %s\n", server)
        time.Sleep(1 * time.Second)
    }
    <- ctx.Done()
    wg.Done()
}

func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
    for {

        select {

            case <- ctx.Done():
                wg.Done()
                return
            default:
                _, p, err :=  conn.ReadMessage()
                if err != nil {
                    wg.Done()
                    fmt.Println("done")
                }
                fmt.Printf("Got [%s] from %s\n", string(p), server)
        }
    }
}

Я протестировал его с помощью simple-websocket-server dpallot сервером 5555, 5556 и 5557 соответственно.

1 Ответ

0 голосов
/ 14 июня 2019

Эта часть вашего кода вызывает проблему:

conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
    panic(err)
}
defer conn.Close()
go sendHandler(ctx, conn)
go readHandler(ctx, conn)

Вы создаете соединение, откладываете его закрытие, запускаете две другие программы и затем завершаете функцию.Функция end закрывает сокет из-за вашей отсрочки.

...