почему эта подпрограмма случайно не завершается, когда блокирующее соединение чтения закрыто? - PullRequest
0 голосов
/ 03 апреля 2019

Почему этот приемник отключается, когда соединение закрывается

Это работает, как и ожидалось, но затем случайным образом, каждые 20–10 000 раз, когда он вызывается, приемник не может отключиться, что затем вызывает обычную утечку, что приводит к 100% ЦП.

Примечание: если я зарегистрирую все ошибки, я увижу чтение на закрытом канале, если закомментировано conn.SetReadDeadline. При использовании я вижу тайм-аут ввода-вывода как ошибку.

Это выполнялось в течение 10 тыс. Циклов, когда основной процесс запускает 11 пар этих отправителей / получателей и обрабатывает 1000 заданий, прежде чем основной процесс отправит сигнал выключения. Эта установка работала в течение> 6 часов без каких-либо проблем до отметки 10 000 циклов в течение ночи, но сегодня утром я не могу заставить ее работать более 20 циклов, не помечая приемник как не выключающийся и не выходящий из системы.

func sender(w worker, ch channel) {

    var j job
    for {
        select {
        case <-ch.quit: // shutdown broadcast, exit
            w.Close()
            ch.stopped <- w.id // debug, send stop confirmed
            return

        case j = <-w.job: // worker designated jobs
        case j = <-ch.spawner: // FCFS jobs
        }

        ... prepare job ...

        w.WriteToUDP(buf, w.addr)

}

func receiver(w worker, ch channel) {

    deadline := 100 * time.Millisecond
out:
    for {
        w.SetReadDeadline(time.Now().Add(deadline))
        // blocking read, should release on close (or deadline)
        n, err = w.c.Read(buf)

        select {
        case <-ch.quit: // shutdown broadcast, exit
            ch.stopped <- w.id+100 // debug, receiver stop confirmed
            return
        default:
        }

        if n == 0 || err != nil {
            continue
        }
        update := &update{id: w.id}

         ... process update logic ...

        select {
        case <-ch.quit: // shutting down
            break out
        case ch.update <- update
        }

}

Мне нужен надежный способ заставить ресивер выключиться, когда он получает либо широковещательную рассылку, либо сообщение закрыто. Функционально закрытие канала должно быть достаточным и является предпочтительным методом в соответствии с пакетом go документация , см. Интерфейс Conn.

Я обновил до последней версии, которая является 1.12.1 без изменений. Работает на MacOS в разработке и CentOS в производстве.

Кто-нибудь сталкивался с этой проблемой? Если да, то как вы надежно это исправили?


Возможное решение

Мое очень многословное и неприглядное решение, которое, возможно, работает как обходной, заключается в следующем:

1) запустить отправителя в обычной процедуре, как это (выше, без изменений)

2) запустить приемник в обычном режиме, например так (ниже)

func receive(w worker, ch channel) {

    request := make(chan []byte, 1)
    reader := make(chan []byte, 1)

    defer func() {
        close(request) // exit signaling
        w.c.Close()    // exit signaling
        //close(reader)
    }()

    go func() {

        // untried senario, for if we still have leaks -> 100% cpu
        // we may need to be totally reliant on closing request or ch.quit
        // defer w.c.Close()

        deadline := 100 * time.Millisecond
        var n int
        var err error

        for buf := range request {
            for {
                select {
                case <-ch.quit: // shutdown signal
                    return
                default:
                }
                w.c.SetReadDeadline(time.Now().Add(deadline))
                n, err = w.c.Read(buf)
                if err != nil { // timeout or close
                    continue
                }
                break
            }
            select {
            case <-ch.quit: // shutdown signal
                return
            case reader <- buf[:n]:
                //default:
            }
        }
    }()

    var buf []byte

out:
    for {

        request <- make([]byte, messageSize)

        select {
        case <-ch.quit: // shutting down
            break out
        case buf = <-reader:
        }

        update := &update{id: w.id}

      ... process update logic ...


        select {
        case <-ch.quit: // shutting down
            break out
        case ch.update <- update
        }

    }

Мой вопрос: почему эта ужасная версия 2, которая порождает новую подпрограмму go для чтения из блокирующего c.Read (buf), кажется, работает более надежно, то есть не пропускает при отправке сигнала выключения, когда гораздо более простая первая версия этого не сделала ... и похоже, что по сути это одно и то же из-за блокировки c.Read (buf).

Понижение моего вопроса НЕ ПОЛЕЗНО, если это законный и поддающийся проверке вопрос, вопрос остается без ответа.

1 Ответ

0 голосов
/ 05 апреля 2019

Спасибо всем за ответы.

Итак. Там никогда не было следа стека. На самом деле, я не получил никаких ошибок, ни обнаружения гонки, ни чего-либо еще, и он не был заблокирован, процедура go просто не закрывалась и не выходила, и она не была постоянно воспроизводимой. Я использую одни и те же данные в течение двух недель.

Когда процедура go не сможет сообщить о выходе, она просто выйдет из-под контроля и доведет процессор до 100%, но только ПОСЛЕ того, как все остальные выйдут, и система продолжит работу. Я никогда не видел, чтобы память росла. Процессор постепенно тикает до 200%, 300%, 400%, и тогда система должна быть перезагружена.

Я зарегистрировался, когда произошла утечка, она всегда была другой, и я получил одну утечку после 380 предыдущих успешных запусков (из 23 пар подпрограмм go, работающих в unision), в следующий раз в 1832 году перед одним получателем утечка, в следующий раз только 23, с точно такими же данными, пережевываемыми в той же начальной точке. Протекший приемник просто вышел из-под контроля, но только после того, как группа из 22 других компаньонов отключилась и успешно вышла из системы, и система перешла к следующей партии. Он не будет постоянно выходить из строя, за исключением того, что в какой-то момент гарантированно произойдет утечка.

После многих дней, многочисленных переписываний и миллионного журнала до / после каждого действия, наконец-то, похоже, в этом и заключалась проблема, и после копания в библиотеке я не уверен, почему именно, и ПОЧЕМУ это происходит случайным образом .

По какой-либо причине библиотека golang.org/x/net/dns/dnsmessage будет беспорядочно выходить из строя, если вы проанализируете и сразу перейдете к пропуску вопросов, не прочитав сначала вопрос. Понятия не имею, почему это важно, привет, пропуск вопросов означает, что вы не заботитесь об этом разделе заголовка и не помечаете его как обработанный, и он прекрасно работает буквально миллион раз подряд, но тогда нет, так что вы, кажется, необходимо прочитать вопрос ДО того, как вы сможете пропустить SkipAllQuestions, так как этот кажется решением. У меня 18 525 партий, и добавление, которое отключило утечки.

var p dnsmessage.Parser
h, err := p.Start(buf[:n])
if err != nil {
    continue // what!?
}

switch {
case h.RCode == dnsmessage.RCodeSuccess:
    q, err := p.Question() // should only have one question
    if q.Type != w.Type || err != nil {
        continue // what!?, impossible
    }
    // Note: if you do NOT do the above first, you're asking for pain! (tr)
    if err := p.SkipAllQuestions(); err != nil {
        continue // what!?
    }
    // Do not count as "received" until we have passed above point and
    // validated that response had a question that we could skip...

...