TCP-соединение возвращает ошибку «сломанный канал» при многократном использовании - PullRequest
0 голосов
/ 31 января 2019

Этот вопрос относится к go и его сетевому пакету.

Я написал простой tcp-сервер, обрабатывающий некоторые RPC.клиент использует chan net.Conn для управления всеми TCP-соединениями на стороне клиента.Сервер работает с прослушивателем tcp.

вот код: client:

package server

import (
    "errors"
    "log"
    "net"
)

var tcpPool chan net.Conn

func NewClient(connections int, address string) {

    tcpPool = make(chan net.Conn, connections)
    for i := 0; i < connections; i++ {
        conn, err := net.Dial("tcp4", address)
        if err != nil {
            log.Panic(err)
        }
        tcpPool <- conn
    }
}

func SendMessage(msg []byte) ([]byte, error) {
    conn := getConn()

    log.Println("check conn: ", conn)
    log.Println("msg: ", msg)

    defer releaseConn(conn)
    // send message
    n, err := conn.Write(msg)
    if err != nil {
        log.Panic(err)
    } else if n < len(msg) {
        log.Panic(errors.New("Message did not send in full"))
    }

    // receiving a message
    inBytes := make([]byte, 0)

    for {
        // bufsize 1024, read bufsize bytes each time
        b := make([]byte, bufSize)
        res, err := conn.Read(b)
        log.Println("server sends >>>>>>>>>>>>: ", res)
        if err != nil {
            b[0] = ReError
            break
        }
        inBytes = append(inBytes, b[:res]...)
        // message finished.
        if res < bufSize {
            break
        }
    }
    // check replied message
    if len(inBytes) == 0 {
        return []byte{}, errors.New("empty buffer error")
    }
    log.Println("SendMessage gets: ", inBytes)
    return inBytes, nil
}

func releaseConn(conn net.Conn) error {
    log.Println("return conn to pool")
    select {
    case tcpPool <- conn:
        return nil
    }
}

func getConn() (conn net.Conn) {
    log.Println("Take one from pool")
    select {
    case conn := <-tcpPool:
        return conn
    }
}

server

func StartTCPServer(network, addr string) error {
    listener, err := net.Listen(network, addr)
    if err != nil {
        return errors.Wrapf(err, "Unable to listen on address %s\n", addr)
    }
    log.Println("Listen on", listener.Addr().String())
    defer listener.Close()
    for {
        log.Println("Accept a connection request.")
        conn, err := listener.Accept()
        if err != nil {
            log.Println("Failed accepting a connection request:", err)
            continue
        }
        log.Println("Handle incoming messages.")
        go onConn(conn)
    }
}

//onConn recieves a tcp connection and waiting for incoming messages
func onConn(conn net.Conn) {
    inBytes := make([]byte, 0)
    defer func() {
        if e := recover(); e != nil {
            //later log
            if err, ok := e.(error); ok {
                println("recover", err.Error())
            }
        }
        conn.Close()
    }()
    // load msg
    for {
        buf := make([]byte, bufSize)
        res, err := conn.Read(buf)
        log.Println("server reading: ", res)
        inBytes = append(inBytes, buf[:res]...)
        if err != nil || res < bufSize {
            break
        }
    }

    var req RPCRequest
    err := json.Unmarshal(inBytes, &req)
    if err != nil {
        log.Panic(err)
    }
    log.Println("rpc request: ", req)

    var query UserRequest
    err = json.Unmarshal(req.Query, &query)
    if err != nil {
        log.Panic(err)
    }
    log.Println("rpc request query: ", query)

    // call method to process request
    // good now we can proceed to function call
    // some actual function calls gets a output
    // outBytes, err := json.Marshal(out)
    conn.Write(outBytes)
}

Я думаю, что это очень стандартно.но по какой-то причине я могу только отправить сообщение на стороне клиента один, а затем последующие 2-й и 3-й запуск показывают некоторую нерегулярность.

1st ---> success, получает ответ 2nd ---> clientможет отправить, но ничего не возвращается, журналы на стороне сервера не показывают в следующем сообщении 3-е ---> если я отправляю со стороны клиента еще раз, это показывает broken pipe ошибка ..

1 Ответ

0 голосов
/ 31 января 2019

Есть плохой способ обработки.Во-первых, флаг для гарантии того, что сообщение с сервера завершено, зависит от io.EOF, а не от длины

    // message finished.
    if res < 512 {
        break
    }

. Вместо этого читатель возвращает io.EOF - единственный символ, который показывает сообщение завершено.Во-вторых, chan type имеет свойство блокировать и не нужно использовать select. Кстати, вам действительно нужно запустить goroutine для освобождения.То же самое требование для getConn

func releaseConn(conn net.Conn)  {
    go func(){
        tcpPool <- conn
    }()
}

func getConn() net.Conn {
    con := <-tcpPool
    return con
}

В-третьих, слушатель не должен быть близко, код ниже плохой

defer listener.Close()

Самая важная причина на стороне клиента, res, err := conn.Read(b) это получениеответ с сервера.когда ничего не отвечаешь, он блокирует, а не io.EOF, ни какую-то другую ошибку.Это означает, что вы не можете поместить длительную часть сообщения в функцию send ().Вы можете сделать одну вещь, чтобы использовать sendmsg () для отправки, но никогда не используйте sendmsg () для обработки ответа.вы можете обработать ответ следующим образом

var receive chan string

func init() {
    receive = make(chan string, 10)
}
func ReceiveMessage(con net.Conn) {
    // receiving a message
    inBytes := make([]byte, 0, 1000)
    var b = make([]byte, 512)
    for {
        // bufsize 1024, read bufsize bytes each time
        res, err := con.Read(b)
        if err != nil {
            if err == io.EOF {
                break
            }
            fmt.Println(err.Error())
            break
        }
        inBytes = append(inBytes, b[:res]...)
        msg := string(inBytes)
        fmt.Println("receive msg from server:" + msg)
        receive <- msg
    }
}

Я обнаружил несколько проблем в вашем коде, но не могу сказать, какая из них приводит к вашей ошибке.Это мой код в соответствии с тем, что вы написали и сделали некоторые исправления.client.go:

package main

import (
    "fmt"
    "io"
    "log"
    "net"
)

var tcpPool chan net.Conn
var receive chan string

func init() {
    receive = make(chan string, 10)
}
func NewClient(connections int, address string) {
    tcpPool = make(chan net.Conn, connections)
    for i := 0; i < connections; i++ {
        conn, err := net.Dial("tcp", address)
        if err != nil {
            log.Panic(err)
        }
        tcpPool <- conn
    }
}

func SendMessage(con net.Conn, msg []byte) error {
    // send message
    _, err := con.Write(msg)
    if err != nil {
        log.Panic(err)
    }
    return nil
}

func ReceiveMessage(con net.Conn) {
    // receiving a message
    inBytes := make([]byte, 0, 1000)
    var b = make([]byte, 512)
    for {
        // bufsize 1024, read bufsize bytes each time
        res, err := con.Read(b)
        if err != nil {
            if err == io.EOF {
                break
            }
            fmt.Println(err.Error())
            break
        }
        inBytes = append(inBytes, b[:res]...)
        msg := string(inBytes)
        fmt.Println("receive msg from server:" + msg)
        receive <- msg
    }
}

func getConn() net.Conn {
    con := <-tcpPool
    return con
}

func main() {
    NewClient(20, "localhost:8101")
    con := <-tcpPool
    e := SendMessage(con, []byte("hello, i am client"))
    if e != nil {
        fmt.Println(e.Error())
        return
    }
    go ReceiveMessage(con)
    var msg string
    for {
        select {
        case msg = <-receive:
            fmt.Println(msg)
        }
    }
}

server.go

package main

import (
    "fmt"
    "io"
    "net"
)

func StartTCPServer(network, addr string) error {
    listener, err := net.Listen(network, addr)
    if err != nil {
        return err
    }
    for {
        conn, err := listener.Accept()
        if err != nil {

            fmt.Println(err.Error())
            continue

        }
        onConn(conn)
    }
}

//onConn recieves a tcp connection and waiting for incoming messages
func onConn(conn net.Conn) {
    inBytes := make([]byte, 0)
    // load msg
    for {
        buf := make([]byte, 512)
        res, err := conn.Read(buf)
        if err != nil {
            if err == io.EOF {
                return
            }
            fmt.Println(err.Error())
            return
        }
        inBytes = append(inBytes, buf[:res]...)

        fmt.Println("receive from client:" + string(inBytes))
        conn.Write([]byte("hello"))
    }
}

func main() {
    if e := StartTCPServer("tcp", ":8101"); e != nil {
        fmt.Println(e.Error())
        return
    }
}

это работает и без ошибок.Кстати, я не вижу, где на стороне клиента или на стороне сервера вы делаете con.Close ().Это необходимо закрыть. Это означает, что соединение, полученное из пула, не устанавливается.Когда вы думаете, что соединение окончено, закройте его и создайте новое соединение, чтобы заполнить пул, а не возвращайте его, потому что это фатальная операция - вернуть закрытый кон в пул.

...