Как максимизировать UDP-пакеты в секунду с Go? - PullRequest
1 голос
/ 21 февраля 2020

Я пытаюсь написать простой сервер ретрансляции в Go для отправки игровых данных между двумя клиентами. Клиенты отправляют сообщения регистрации UDP, состоящие из идентификатора протокола, за которым следуют идентификатор совпадения и идентификатор клиента; Эти сообщения используются для настройки хеш-таблицы адресов, чтобы при получении стандартного сообщения (игровых данных) мы могли сопоставить его с предварительно зарегистрированным получателем (без необходимости отправлять эти данные в каждом сообщении). (Я использую github.com / cornelk / hashmap , если это имеет какое-то значение, а сервер является экземпляром GCP Compute highcpu-16)

Эта установка отлично работает с небольшим числом клиенты (~ 30 сообщений в секунду). Однако, по мере того как я увеличиваю нагрузочное тестирование, уровни исходящей пропускной способности отключаются, поскольку входящая пропускная способность продолжает расти. Я выполнил регистрацию с vmstat и ifstat (а также с некоторым dropwatch мониторингом, который говорит, что множество пакетов отбрасывается при уровень программного обеспечения). Мне кажется, что пакеты отбрасываются, потому что мой go сервер недостаточно быстро их читает.

Изначально я использовал одну загрузку на ядро ​​процессора:

runtime.GOMAXPROCS(runtime.NumCPU())

connection, err := net.ListenUDP("udp", &addr)
if err != nil {
    panic(err)
}

for i := 0; i < runtime.NumCPU(); i++ {
    go listen(connection, c)
}

I ' Мы также пытались повторно использовать порт и прослушивать отдельно с каждой программой (используя пакет github.com / libp2p / go -reuseport ).

Наконец, я попытался настроить буферизованные каналы для входящие и исходящие сообщения, чтобы минимизировать время, в течение которого прослушивающие программы не извлекают сообщения.

Чего мне не хватает при попытке обработать 100 000 игроков одновременно? Такое ощущение, что, независимо от моего подхода, я не могу превзойти около 3000 игроков, не потеряв значительное количество пакетов.

Моя listen функция прошла через много итераций в тщетных попытках увеличить количество пакетов в секунду, но обычно это:

// Emits SIGABRT to the interrupts channel if an error occurs outside of individual message handling.
func listen(connection *net.UDPConn, interrupts chan os.Signal/*, inbox chan IncomingMessage*/) {
    buffer := make([]byte, 1024)
    n, remoteAddr, err := 0, new(net.UDPAddr), error(nil)
    for err == nil {
        n, remoteAddr, err = connection.ReadFromUDP(buffer)
        if err != nil {
            continue
        }

        //log.Println("Received", n, "bytes", hex.EncodeToString(buffer[:n]))
        if n < 2 {
            log.Println("Minimum packet length is 2 bytes. Received a packet of length", n, "bytes")
            continue
        }

        //inbox<-IncomingMessage{
        //    sender: remoteAddr,
        //    data:   append([]byte(nil), buffer[:n]...),
        //}

        //go handlePacket(inbox, remoteAddr, append([]byte(nil), buffer[:n]...), n)

        protocolId := binary.LittleEndian.Uint16(buffer)
        if protocolId == registrationProtocolId {
            // Start a goroutine to handle the packet (copy the buffer minus the protocol id))
            //go handleRegistrationPacket(connection, remoteAddr, append([]byte(nil), buffer[:n]...))
            handleRegistrationPacket(outbox, remoteAddr, buffer[:n])
        } else if protocolId == matchProtocolId {
            //go handleStandardPacket(connection, remoteAddr.String(), append([]byte(nil), buffer[:n]...))
            handleStandardPacket(outbox, remoteAddr.String(), buffer[:n])
        } else {
            log.Println("Unrecognised protocol id: ", protocolId)
        }
    }
    log.Println("Listener failed:", err)
    interrupts<-syscall.SIGABRT
}

На этом графике показано 1600 одновременных игр (3200 клиентов). После определенного момента исходящие КБ / с перестают подниматься. Процессор даже не потеет. Load-Test Graph showing Network bandwidth and cpu load

1 Ответ

0 голосов
/ 24 февраля 2020

Я перечитал Как получать миллион пакетов в секунду , что говорит о том, что использование опции SO_REUSEPORT в сокетах - правильный подход. В приведенном ниже коде я использую соединение на ядро ​​компьютера (16 в моем случае) и 4 обработчика исходящих сообщений на соединение. Использование ЦП несколько выше, 4 на соединение, но кажется немного более надежным, чем 1 на соединение (хотя требуется тестирование и однозначный ответ).

Ниже не показано, что функция listen запускает процедуры обрабатывать входящие сообщения и сразу же возвращается к прослушиванию. outbox передается в функцию обработки в качестве средства отправки сообщений.

type OutgoingMessage struct {
    recipient *net.UDPAddr
    data      []byte
}

// ...

func beginListen(c chan os.Signal) {
    addr := net.UDPAddr{
        Port: 1234,
        IP:   net.IP{0, 0, 0, 0},
    }

    connection, err := reuseport.ListenPacket("udp", addr.String())

    if err != nil {
        panic(err)
    }

    outbox := make(chan OutgoingMessage, maxQueueSize)

    sendFromOutbox := func() {
        n, err := 0, error(nil)
        for msg := range outbox {
            n, err = connection.(*net.UDPConn).WriteToUDP(msg.data, msg.recipient)
            if err != nil {
                panic(err)
            }
            if n != len(msg.data) {
                log.Println("Tried to send", len(msg.data), "bytes but only sent ", n)
            }
        }
    }

    for i := 1;  i <= 4; i++ {
        go sendFromOutbox()
    }

    listen(connection.(*net.UDPConn), c, outbox)

    close(outbox)
}

func main() {
    log.Println("Starting...")

    runtime.GOMAXPROCS(runtime.NumCPU())

    c := make(chan os.Signal, 1)

    for i := 0; i < runtime.NumCPU(); i++ {
        go beginListen(c)
    }

    // ...
}

На приведенном ниже графике показано отличие от предыдущего: Graph showing bandwitch and cpu load

(До сих пор это был скачок; имена переменных не останутся, и я знаю, что должен продолжать записывать неотправленные байты, пока не будут отправлены все данные.)

...