Почему этот приемник отключается, когда соединение закрывается
Это работает, как и ожидалось, но затем случайным образом, каждые 20–10 000 раз, когда он вызывается, приемник не может отключиться, что затем вызывает обычную утечку, что приводит к 100% ЦП.
Примечание: если я зарегистрирую все ошибки, я увижу чтение на закрытом канале, если закомментировано conn.SetReadDeadline. При использовании я вижу тайм-аут ввода-вывода как ошибку.
Это выполнялось в течение 10 тыс. Циклов, когда основной процесс запускает 11 пар этих отправителей / получателей и обрабатывает 1000 заданий, прежде чем основной процесс отправит сигнал выключения. Эта установка работала в течение> 6 часов без каких-либо проблем до отметки 10 000 циклов в течение ночи, но сегодня утром я не могу заставить ее работать более 20 циклов, не помечая приемник как не выключающийся и не выходящий из системы.
func sender(w worker, ch channel) {
var j job
for {
select {
case <-ch.quit: // shutdown broadcast, exit
w.Close()
ch.stopped <- w.id // debug, send stop confirmed
return
case j = <-w.job: // worker designated jobs
case j = <-ch.spawner: // FCFS jobs
}
... prepare job ...
w.WriteToUDP(buf, w.addr)
}
func receiver(w worker, ch channel) {
deadline := 100 * time.Millisecond
out:
for {
w.SetReadDeadline(time.Now().Add(deadline))
// blocking read, should release on close (or deadline)
n, err = w.c.Read(buf)
select {
case <-ch.quit: // shutdown broadcast, exit
ch.stopped <- w.id+100 // debug, receiver stop confirmed
return
default:
}
if n == 0 || err != nil {
continue
}
update := &update{id: w.id}
... process update logic ...
select {
case <-ch.quit: // shutting down
break out
case ch.update <- update
}
}
Мне нужен надежный способ заставить ресивер выключиться, когда он получает либо широковещательную рассылку, либо сообщение закрыто. Функционально закрытие канала должно быть достаточным и является предпочтительным методом в соответствии с пакетом go документация , см. Интерфейс Conn.
Я обновил до последней версии, которая является 1.12.1 без изменений.
Работает на MacOS в разработке и CentOS в производстве.
Кто-нибудь сталкивался с этой проблемой?
Если да, то как вы надежно это исправили?
Возможное решение
Мое очень многословное и неприглядное решение, которое, возможно, работает как обходной, заключается в следующем:
1) запустить отправителя в обычной процедуре, как это (выше, без изменений)
2) запустить приемник в обычном режиме, например так (ниже)
func receive(w worker, ch channel) {
request := make(chan []byte, 1)
reader := make(chan []byte, 1)
defer func() {
close(request) // exit signaling
w.c.Close() // exit signaling
//close(reader)
}()
go func() {
// untried senario, for if we still have leaks -> 100% cpu
// we may need to be totally reliant on closing request or ch.quit
// defer w.c.Close()
deadline := 100 * time.Millisecond
var n int
var err error
for buf := range request {
for {
select {
case <-ch.quit: // shutdown signal
return
default:
}
w.c.SetReadDeadline(time.Now().Add(deadline))
n, err = w.c.Read(buf)
if err != nil { // timeout or close
continue
}
break
}
select {
case <-ch.quit: // shutdown signal
return
case reader <- buf[:n]:
//default:
}
}
}()
var buf []byte
out:
for {
request <- make([]byte, messageSize)
select {
case <-ch.quit: // shutting down
break out
case buf = <-reader:
}
update := &update{id: w.id}
... process update logic ...
select {
case <-ch.quit: // shutting down
break out
case ch.update <- update
}
}
Мой вопрос: почему эта ужасная версия 2, которая порождает новую подпрограмму go для чтения из блокирующего c.Read (buf), кажется, работает более надежно, то есть не пропускает при отправке сигнала выключения, когда гораздо более простая первая версия этого не сделала ... и похоже, что по сути это одно и то же из-за блокировки c.Read (buf).
Понижение моего вопроса НЕ ПОЛЕЗНО, если это законный и поддающийся проверке вопрос, вопрос остается без ответа.