Как освободить ресурс сервера websocket и redis в golang? - PullRequest
1 голос
/ 12 апреля 2019

У меня есть сервер шлюза, который может передавать сообщения на сторону клиента с помощью веб-сокета. Новый клиент подключен к моему серверу, я сгенерирую для него cid.И тогда я также подписываюсь на канал, который использует cid.Если какое-либо сообщение будет опубликовано на этом канале, Мой сервер отправит его на клиентскую сторону.На данный момент все устройства работают нормально, но когда я пытаюсь протестировать их с помощью теста thor , он вылетит, я прекрасно понимаю, что у DeliverMessage есть какая-то проблема, он никогда не выйдет, так как у него естьумереть-петлю.но поскольку redis нужно что-то подписывать, я не знаю, как избежать цикла.

func (h *Hub) DeliverMessage(pool *redis.Pool) {
    conn := pool.Get()
    defer conn.Close()
    var gPubSubConn *redis.PubSubConn
    gPubSubConn = &redis.PubSubConn{Conn: conn}
    defer gPubSubConn.Close()

    for {
        switch v := gPubSubConn.Receive().(type) {
        case redis.Message:
            // fmt.Printf("Channel=%q |  Data=%s\n", v.Channel, string(v.Data))
            h.Push(string(v.Data))
        case redis.Subscription:
            fmt.Printf("Subscription message: %s : %s %d\n", v.Channel, v.Kind, v.Count)
        case error:
            fmt.Println("Error pub/sub, delivery has stopped", v)
            panic("Error pub/sub")
        }
    }
}

В основной функции я вызывал вышеуказанную функцию как:

go h.DeliverMessage(pool)

Нокогда я тестирую его с огромным соединением, у меня появляется какая-то ошибка вроде:

ERR максимальное число клиентов достигло

Итак, я изменяю размер пула redis при изменении MaxIdle:

func newPool(addr string) *redis.Pool {
    return &redis.Pool{
        MaxIdle:     5000,
        IdleTimeout: 240 * time.Second,
        Dial:        func() (redis.Conn, error) { return redis.Dial("tcp", addr) },
    }
}

Но это все равно не работает, поэтому я хотел бы знать, есть ли какой-нибудь хороший способ убить горутин после того, как мой веб-сокет отключился от моего сервера в следующем выборе:

case client := <-h.Unregister:
    if _, ok := h.Clients[client]; ok {
        delete(h.Clients, client)
        delete(h.Connections, client.CID)
        close(client.Send)
        if err := gPubSubConn.Unsubscribe(client.CID); err != nil {
            panic(err)
        }
        // TODO kill subscribe goroutine if don't client-side disconnected ...

    }

Но как мне определить эту гурутину?Как я могу сделать это как unix способ.kill -9 <PID>

Ответы [ 2 ]

1 голос
/ 12 апреля 2019

Посмотрите на пример здесь

Вы можете завершить выполнение своей подпрограммы с помощью оператора return в вашем регистре переключателя в вашем сообщении DeliverMessage, если вы больше ничего не получаете.Я предполагаю case error, или, как видно из примера, case 0, вы захотите вернуться с этого, и ваша процедура будет отменена.Или, если я неправильно понимаю, и case client := <-h.Unregister: находится внутри DeliverMessage, просто вернитесь.

Вы также дважды закрываете соединение.defer gPubSubConn.Close() просто вызывает conn.Close (), поэтому вам не нужно defer conn.Close()

Также взгляните на Пул и посмотрите, что на самом деле делают все параметры.Если вы хотите обрабатывать много соединений, установите MaxActive на 0 «Когда ноль, нет ограничений на количество соединений в пуле».(а вы на самом деле хотите время простоя?)

0 голосов
/ 16 апреля 2019

На самом деле, у меня неправильная архитектура проекта, я собираюсь объяснить, что я хочу сделать.

  1. Клиент может подключиться к моему серверу веб-сокетов;

  2. Сервер имеет несколько обработчиков http, и администратор может публиковать данные через обработчик, структура данных может быть такой:

    {
       "cid": "something",
       "body": {
        }
    }
    

Поскольку у меня есть несколько узлов, работающих для обслуживания нашего клиента, и Nginx может отправлять каждый запрос с admin на совершенно другой узел, но только один узел удерживает соединение около cid с «чем-то», поэтому я будунеобходимо опубликовать эти данные на Redis, если какой-либо узел получил данные, он отправит это сообщение на клиентскую сторону.

3. Ищет NodeID, который я собираюсьPublish по заданному cid.

// redis code & golang
NodeID, err := conn.Do("HGET", "NODE_MAP", cid)

4.В данный момент я могу опубликовать любое сообщение из admin и опубликовать в NodeID, которое мы получили на шаге 3.

// redis code & golang
NodeID, err := conn.Do("PUBLISH", NodeID, data)

Время показывать код ядра, связанный с этим вопросом.Я собираюсь подписать канал, имя которого NodeID.как показано ниже.

go func(){
  for {
    switch v := gPubSubConn.Receive().(type) {
    case redis.Message:
        fmt.Println("Got a message", v.Data)
        h.Broadcast <- v.Data
        pipeline <- v.Data
    case error:
        panic(v)
    }
  }
}()

6. Чтобы управлять вашей веб-розеткой, вам также потребуется программа для этого.как следующим образом:

   go func () {
     for {
            select {
            case client := <-h.Register:
                h.Clients[client] = true
                cid := client.CID
                h.Connections[cid] = client

                body := "something"
                client.Send <- msg // greeting

            case client := <-h.Unregister:
                if _, ok := h.Clients[client]; ok {
                    delete(h.Clients, client)
                    delete(h.Connections, client.CID)
                    close(client.Send)
                }
            case message := <-h.Broadcast:
                fmt.Println("message is", message)
            }
        }
      }()

Последнее, что нужно, - это управление пулом redis, вам сейчас не нужен пул соединений.поскольку у нас есть только два goroutine, один основной процесс.

func newPool(addr string) *redis.Pool {
    return &redis.Pool{
        MaxIdle:     100,
        IdleTimeout: 240 * time.Second,
        Dial:        func() (redis.Conn, error) { return redis.Dial("tcp", addr) },
    }
}

var (
    pool        *redis.Pool
    redisServer = flag.String("redisServer", ":6379", "")
)
pool = newPool(*redisServer)
conn := pool.Get()
defer conn.Close()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...