Go подпрограммы с потребительским каналом и контекстом kafka - PullRequest
0 голосов
/ 13 января 2020

У меня есть простой потребитель kafka, для которого я создал дескриптор и пытаюсь прочитать его с помощью процедуры go:

func process(ctx context.Context){
consumer := queueHandle.Consume(topic_ops_req, consumerHandler)
// Get signal for finish
doneCh := make(chan struct{})
go func(consumer chan *sarama.ConsumerMessage, ctx context.Context) {
    for {
        select {
        case msg, ok := <-consumer:
            if !ok {
                logger.Info("Channel has been closed")
                doneCh <- struct{}{}
                return
            }

            var request queue.Request
            err := json.Unmarshal(msg.Value, &request)
            if err != nil {
                logger.Error("consumer unmarshal err", err)
                panic(err)
            }
            res, err := new_process(ctx, request, service) // call another func
            if err != nil {
                //TODO
            }
            result = res
            doneCh <- struct{}{}

        case <-ctx.Done():
            logger.Info(fmt.Sprintf("Context ended  with err : %s", ctx.Err()))
            doneCh <- struct{}{}

        }
    }
}(consumer, ctx)
<-doneCh

}

Проблема, которую я вижу, заключается в что после того, как я введу "case <-ctx.Done ()", подпрограмма go не вводит "case msg, ok: = <-consumer" и всегда возвращает завершение контекста. Как мне go fun c работать как с потребительским каналом, так и ctx.Done ()? </p>

...