Golang - масштабирование клиента веб-сокета для нескольких соединений с разными серверами - PullRequest
0 голосов
/ 01 февраля 2019

У меня есть клиент веб-сокета.В действительности это намного сложнее, чем основной код, показанный ниже.Теперь мне нужно масштабировать этот клиентский код для открытия соединений с несколькими серверами.В конечном итоге задачи, которые необходимо выполнить при получении сообщения от серверов, идентичны.Каков наилучший способ справиться с этим?Как я уже говорил выше, фактический код, выполняемый при получении сообщения, намного сложнее, чем показано в примере.

package main

import (
        "flag"
        "log"
        "net/url"
        "os"
        "os/signal"
        "time"

        "github.com/gorilla/websocket"
)

var addr = flag.String("addr", "localhost:1234", "http service address")

func main() {
        flag.Parse()
        log.SetFlags(0)

        interrupt := make(chan os.Signal, 1)
        signal.Notify(interrupt, os.Interrupt)

        // u := url.URL{Scheme: "ws", Host: *addr, Path: "/echo"}
        u := url.URL{Scheme: "ws", Host: *addr, Path: "/"}
        log.Printf("connecting to %s", u.String())

        c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
        if err != nil {
                log.Fatal("dial:", err)
        }
        defer c.Close()

        done := make(chan struct{})

        go func() {
                defer close(done)
                for {
                        _, message, err := c.ReadMessage()
                        if err != nil {
                                log.Println("read:", err)
                                return
                        }
                        log.Printf("recv: %s", message)
                }
        }()

        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()

        for {
                select {
                case <-done:
                        return
                case t := <-ticker.C:
                        err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
                        if err != nil {
                                log.Println("write:", err)
                                return
                        }
                case <-interrupt:
                        log.Println("interrupt")

                        // Cleanly close the connection by sending a close message and then
                        // waiting (with timeout) for the server to close the connection.
                        err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
                        if err != nil {
                                log.Println("write close:", err)
                                return
                        }
                        select {
                        case <-done:
                        case <-time.After(time.Second):
                        }
                        return
                }
        }
}

Ответы [ 2 ]

0 голосов
/ 01 февраля 2019

Изменить обработку прерываний, чтобы закрыть канал при прерывании.Это позволяет нескольким процедурам ожидать события, ожидая закрытия канала.

shutdown := make(chan struct{})
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
go func() {
    <-interrupt
    log.Println("interrupt")
    close(shutdown)
}()

Перемещение кода для соединения в функцию.Этот код является копией и вставкой из вопроса с двумя изменениями: канал прерывания заменяется каналом отключения;Функция уведомляет sync.WaitGroup о завершении функции.

func connect(u string, shutdown chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()

    log.Printf("connecting to %s", u)
    c, _, err := websocket.DefaultDialer.Dial(u, nil)
    if err != nil {
        log.Fatal("dial:", err)
    }
    defer c.Close()

    done := make(chan struct{})

    go func() {
        defer close(done)
        for {
            _, message, err := c.ReadMessage()
            if err != nil {
                log.Println("read:", err)
                return
            }
            log.Printf("recv: %s", message)
        }
    }()

    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-done:
            return
        case t := <-ticker.C:
            err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
            if err != nil {
                log.Println("write:", err)
                return
            }
        case <-shutdown:
            // Cleanly close the connection by sending a close message and then
            // waiting (with timeout) for the server to close the connection.
            err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
            if err != nil {
                log.Println("write close:", err)
                return
            }
            select {
            case <-done:
            case <-time.After(time.Second):
            }
            return
        }
    }
}

Объявите sync.WaitGroup в main().Для каждой конечной точки веб-сокета, к которой вы хотите подключиться, увеличьте WaitGroup и запустите процедуру подключения этой конечной точки.После запуска goroutines, дождитесь завершения WaitGroup для goroutines.

var wg sync.WaitGroup
for _, u := range endpoints { // endpoints is []string 
                              // where elements are URLs 
                              // of endpoints to connect to.
    wg.Add(1)
    go connect(u, shutdown, &wg)
}
wg.Wait()

Приведенный выше код с правкой, позволяющей запустить его на эхо-сервере Gorilla, размещен на детской площадке .

0 голосов
/ 01 февраля 2019

является ли связь с каждым другим сервером полностью независимой от других серверов?если да, то я бы обошел его так:

  • в main создать контекст с функцией отмены
  • создать группа ожидания в главном для отслеживания запущенных подпрограмм
  • для каждого сервера, добавление в группу ожидания, запуск новой подпрограммы из главной функции, передавая контекст и ссылки в группу ожидания
  • main входит в цикл for / select, прослушивая сигналы, и если он приходит, вызывает cancelfunc и ожидает в группе ожидания.
  • main также может прослушивать результаты чана из процедур и, возможно, печатать результаты самостоятельно, если программы не должны делать это напрямую.
  • каждые процедуры имеет, как мы сказали, ссылки на wg, контекст и, возможно, канал для возврата результатов.Теперь подход разделяется на то, должен ли горутин делать только одно и то же, или если ему нужно выполнить последовательность действий.для первого подхода
  • , если нужно сделать только одну вещь, мы следуем подходу, подобному описанному здесь (заметьте, что, чтобы быть асинхронным, он, в свою очередь, запустит новую программу, чтобы выполнитьшаг DoSomething (), который возвращает результат в канале), который позволяет ему принимать сигнал отмены в любое время.вам решать, насколько неблокирующим вы хотите быть и насколько быстро вы хотите отвечать на сигналы отмены. Также преимущество того, что связанный с ним контекст передается в подпрограммы, состоит в том, что вы можете вызвать включенный контекстверсии большинства библиотечных функций.Например, если вы хотите, чтобы у ваших циферблатов был тайм-аут, скажем, 1 минута, вы должны создать новый контекст с тайм-аутом из прошедшего, а затем DialContext с этим.Это позволяет циферблату останавливаться как по тайм-ауту, так и по вызову cancelfunc родительского (тот, который вы создали в основном) контекста.
  • если нужно сделать больше вещей, я обычно предпочитаю сделать что-то с горутинпусть он вызовет новый с последующим выполнением следующего шага (передача всех ссылок по конвейеру) и завершится.

этот подход хорошо масштабируется с отменами и возможностью остановить конвейер в любой моментшаг, а также легко поддерживать контексты с дефинициями для шагов, которые могут занять слишком много времени.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...