Закрыть чтение goroutine из TCP-соединения без закрытия соединения - PullRequest
0 голосов
/ 05 мая 2018

Мне нравится, как Go обрабатывает внутреннее мультиплексирование ввода-вывода, которое epoll и другие механизмы и планируют зеленые потоки (здесь процедура go), что дает свободу писать синхронный код.

Я знаю, что TCP-сокеты равны non-blocking, а read выдаст EAGAIN, если данные недоступны. Учитывая это, conn.Read(buffer) обнаружит это, а блокирует процедуру go, выполняя чтение соединения без данных, доступных в буфере сокета . Есть ли способ, чтобы остановить такие рутины без закрытия основного соединения. Я использую пул соединений, поэтому закрытие TCP-соединения для меня не имеет смысла, и я хочу вернуть это соединение обратно в пул.

Вот код для симуляции такого сценария:

func main() {
    conn, _ := net.Dial("tcp", "127.0.0.1:9090")
    // Spawning a go routine
    go func(conn net.Conn) {
        var message bytes.Buffer
        for {
            k := make([]byte, 255) // buffer
            m, err := conn.Read(k) // blocks here 
            if err != nil {
                if err != io.EOF {
                    fmt.Println("Read error : ", err)
                } else {
                    fmt.Println("End of the file")
                }
                break // terminate loop if error
            }
            // converting bytes to string for printing
            if m > 0 {
                for _, b := range k {
                    message.WriteByte(b)
                }
                fmt.Println(message.String())
            }

        }
    }(conn)

    // prevent main from exiting
    select {}
}

Какие другие подходы я могу использовать, если это невозможно:

1) Позвоните syscall.Read и обработайте это вручную. В этом случае мне нужен способ проверить, является ли сокет читаемым перед вызовом syscall.Read, в противном случае я буду тратить ненужные циклы ЦП. Для моего сценария, я думаю, что я могу пропустить событие опроса на основе событий и продолжать вызывать syscall.Read, так как в моем случае использования всегда есть данные.

2) Любые предложения:)

1 Ответ

0 голосов
/ 05 мая 2018
func receive(conn net.TCPConn, kill <-chan struct{}) error {
    // Spawn a goroutine to read from the connection.
    data := make(chan []byte)
    readErr := make(chan error)
    go func() {
        for {
            b := make([]byte, 255)
            _, err := conn.Read(b)
            if err != nil {
                readErr <- err
                break
            }
            data <- b
        }
    }()


    for {
        select {
        case b := <-data:
            // Do something with `b`.
        case err := <-readErr:
            // Handle the error.
            return err
        case <-kill:
            // Received kill signal, returning without closing the connection.
            return nil
        }
    }
}

Отправьте пустую структуру на kill из другой программы, чтобы прекратить прием соединения. Вот программа, которая перестает получать через секунду:

kill := make(chan struct{})
go func() {
    if err := receive(conn, kill); err != nil {
        log.Fatal(err)
    }
}()
time.Sleep(time.Second)
kill <- struct{}{}

Это может быть не совсем то, что вы ищете, потому что программа чтения все равно будет заблокирована на Read даже после отправки на kill. Однако процедура обработки входящих чтений будет прекращена.

...