На самом деле, у меня неправильная архитектура проекта, я собираюсь объяснить, что я хочу сделать.
Клиент может подключиться к моему серверу веб-сокетов;
Сервер имеет несколько обработчиков http, и администратор может публиковать данные через обработчик, структура данных может быть такой:
{
"cid": "something",
"body": {
}
}
Поскольку у меня есть несколько узлов, работающих для обслуживания нашего клиента, и Nginx может отправлять каждый запрос с admin
на совершенно другой узел, но только один узел удерживает соединение около cid
с «чем-то», поэтому я будунеобходимо опубликовать эти данные на Redis
, если какой-либо узел получил данные, он отправит это сообщение на клиентскую сторону.
3. Ищет NodeID, который я собираюсьPublish
по заданному cid.
// redis code & golang
NodeID, err := conn.Do("HGET", "NODE_MAP", cid)
4.В данный момент я могу опубликовать любое сообщение из admin
и опубликовать в NodeID
, которое мы получили на шаге 3.
// redis code & golang
NodeID, err := conn.Do("PUBLISH", NodeID, data)
Время показывать код ядра, связанный с этим вопросом.Я собираюсь подписать канал, имя которого NodeID.как показано ниже.
go func(){
for {
switch v := gPubSubConn.Receive().(type) {
case redis.Message:
fmt.Println("Got a message", v.Data)
h.Broadcast <- v.Data
pipeline <- v.Data
case error:
panic(v)
}
}
}()
6. Чтобы управлять вашей веб-розеткой, вам также потребуется программа для этого.как следующим образом:
go func () {
for {
select {
case client := <-h.Register:
h.Clients[client] = true
cid := client.CID
h.Connections[cid] = client
body := "something"
client.Send <- msg // greeting
case client := <-h.Unregister:
if _, ok := h.Clients[client]; ok {
delete(h.Clients, client)
delete(h.Connections, client.CID)
close(client.Send)
}
case message := <-h.Broadcast:
fmt.Println("message is", message)
}
}
}()
Последнее, что нужно, - это управление пулом redis, вам сейчас не нужен пул соединений.поскольку у нас есть только два goroutine
, один основной процесс.
func newPool(addr string) *redis.Pool {
return &redis.Pool{
MaxIdle: 100,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr) },
}
}
var (
pool *redis.Pool
redisServer = flag.String("redisServer", ":6379", "")
)
pool = newPool(*redisServer)
conn := pool.Get()
defer conn.Close()