У меня есть простой потребитель kafka, для которого я создал дескриптор и пытаюсь прочитать его с помощью процедуры go:
func process(ctx context.Context){
consumer := queueHandle.Consume(topic_ops_req, consumerHandler)
// Get signal for finish
doneCh := make(chan struct{})
go func(consumer chan *sarama.ConsumerMessage, ctx context.Context) {
for {
select {
case msg, ok := <-consumer:
if !ok {
logger.Info("Channel has been closed")
doneCh <- struct{}{}
return
}
var request queue.Request
err := json.Unmarshal(msg.Value, &request)
if err != nil {
logger.Error("consumer unmarshal err", err)
panic(err)
}
res, err := new_process(ctx, request, service) // call another func
if err != nil {
//TODO
}
result = res
doneCh <- struct{}{}
case <-ctx.Done():
logger.Info(fmt.Sprintf("Context ended with err : %s", ctx.Err()))
doneCh <- struct{}{}
}
}
}(consumer, ctx)
<-doneCh
}
Проблема, которую я вижу, заключается в что после того, как я введу "case <-ctx.Done ()", подпрограмма go не вводит "case msg, ok: = <-consumer" и всегда возвращает завершение контекста. Как мне go fun c работать как с потребительским каналом, так и ctx.Done ()? </p>