//we can use context to exit when some one called context cancel.
func (sc *saramaConsumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
defer func() {
sc.logger.Debug("message channel is closed returninig from consume Cliam", "config", sc.config)
}()
for {
select {
//if message channel is closed we exit
case msg, ok := <-claim.Messages():
if !ok {
return nil
}
// still this would hang if we some how come here and some one called context cancel when we were here. so only solution is use go routine and launch sc.messages read there
// and do a time.after or use another channel to see if we can push or not.
sc.messages <- msg
break
case <-sc.config.Context().Done():
return nil
}
}
}
// this is another way we can write up with out using go routine.
func (sc *saramaConsumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
defer func() {
sc.logger.Debug("message channel is closed returninig from consume Cliam", "config", sc.config)
}()
for {
select {
//if message channel is closed we exit
case msg, ok := <-claim.Messages():
if !ok {
return nil
}
//we either exit on context being done or while pushing to chan.
select {
case sc.messages <- msg:
case <-sc.config.Context().Done():
return nil
}
break
case <-sc.config.Context().Done():
return nil
}
}
}