Кафка-потребитель не потребляет сообщения - PullRequest
0 голосов
/ 11 мая 2019

Я новичок в Кафке. Потребитель кафки не читает сообщения из данной темы. Я также проверяю консоль kafka. это не работает. Я не понимаю проблемы. раньше все работало нормально.

public string MessageConsumer(string brokerList, List<string> topics, CancellationToken cancellationToken)
    {

        //ConfigurationManager.AutoLoadAppSettings("", "", true);
        string logKey = string.Format("ARIConsumer.StartPRoducer ==>Topics {0} Key{1} =>", "", string.Join(",", topics));

        string message = string.Empty;
        var conf = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "23",
            EnableAutoCommit = false,                
            AutoOffsetReset = AutoOffsetResetType.Latest,
        };

        using (var c = new Consumer<Ignore, string>(conf))
        {
            try
            {
                c.Subscribe(topics);
                bool consuming = true;
                // The client will automatically recover from non-fatal errors. You typically
                // don't need to take any action unless an error is marked as fatal.
                c.OnError += (_, e) => consuming = !e.IsFatal;
                while (consuming)
                {
                    try
                    {
                        TimeSpan timeSpan = new TimeSpan(0, 0, 5);

                        var cr = c.Consume(timeSpan);
                        // Thread.Sleep(5000);
                        if (cr != null)
                        {
                            message = cr.Value;
                            Console.WriteLine("Thread" + Thread.CurrentThread.ManagedThreadId + "Message : " + message);

                            CLogger.WriteLog(ELogLevel.INFO, $"Consumed message Partition '{cr.Partition}' at: '{cr.TopicPartitionOffset} thread: { Thread.CurrentThread.ManagedThreadId}'. Message: {message}");
                            //Console.WriteLine($"Consumed message Partition '{cr.Partition}' at: '{cr.TopicPartitionOffset}'. Topic: { cr.Topic} value :{cr.Value} Timestamp :{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture)} GrpId: { conf.GroupId}");
                            c.Commit();
                        }
                        Console.WriteLine($"Calling the next Poll ");
                    }

                    catch (ConsumeException e)
                    {
                        CLogger.WriteLog(ELogLevel.ERROR, $"Error occured: {e.Error.Reason}");

                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                    //consuming = false;
                }
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
            catch (Exception ex)
            {

            }
        }

        return message;
    }

В чем проблема с этим кодом или есть проблема установки с kafka

1 Ответ

1 голос
/ 11 мая 2019

Есть ли производитель, активно отправляющий данные?

Ваш потребитель начинает с последних смещений, основанных на AutoOffsetReset, поэтому он не будет читать существующие данные в теме

Потребитель консоли также по умолчанию использует самое последнее смещение

И если вы не изменили GroupId, то ваш потребитель мог бы сработать один раз, затем вы использовали данные, а затем зафиксировали смещения для этой группы. Когда потребитель снова начинает работу в той же группе, он возобновляет работу только с конца темы или смещения последнего коммита

У вас также есть пустой catch (Exception ex), который может скрывать какую-то другую ошибку

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...