Как написать функцию, которая не зависает при чтении с канала и переходе на другой канал - PullRequest
0 голосов
/ 08 февраля 2019

Рассмотрим функцию, подобную этой:

func (sc *saramaConsumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {    

    for msg  := range claim.Messages() {
         sc.messages <- msg //this would hang the call if no one is reading from sc.messages and u can never exit consume cliam
     }
}

1 Ответ

0 голосов
/ 08 февраля 2019
//we can use context to exit when some one called context cancel.
func (sc *saramaConsumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {    

    defer func() {
        sc.logger.Debug("message channel is closed returninig from consume Cliam", "config", sc.config)
    }()

    for {
        select {
        //if message channel is closed we exit
        case msg, ok := <-claim.Messages():
            if !ok {
                return nil
            }
            // still this would hang if we some how come here and some one called context cancel when we were here. so only solution is use go routine and launch sc.messages read there 
            // and do a time.after or use another channel to see if we can push or not.
             sc.messages <- msg 

            break
        case <-sc.config.Context().Done():
            return nil
        }
    }
}


// this is another way we can write up with out using go routine.
func (sc *saramaConsumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {    

    defer func() {
        sc.logger.Debug("message channel is closed returninig from consume Cliam", "config", sc.config)
    }()

    for {
        select {
        //if message channel is closed we exit
        case msg, ok := <-claim.Messages():
            if !ok {
                return nil
            }
            //we either exit on context being done or while pushing to chan.
            select {
            case sc.messages <- msg:
            case <-sc.config.Context().Done():
                return nil
            }

            break
        case <-sc.config.Context().Done():
            return nil
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...