Разные результаты при использовании одного и того же кода в разных типах приложений - PullRequest
1 голос
/ 09 января 2020

В настоящее время я работаю с примером для потребителя Producer, использующим кластер kubernetes для запуска установки kafka-zookeeper. Мой продюсер создан как RestAPI, может с помощью почтового запроса отправить строку в очередь в kafka,

Мой потребитель - consoleapp, который некоторое время истинно l oop продолжает прослушивать новые сообщения в определенном топи c.

Кажется, мой продюсер легко может отправить sh сообщение в очередь, но мой потребитель, кажется, может потреблять сообщения? Только если я сделаю это как запрос GET, смогу ли я получить сообщения из очереди?

Я получаю сообщение об ошибке, в котором говорится, что по какой-то причине невозможно установить соединение?

Я не уверен, что понимаю, что тип приложения должен делать здесь? Почему это работает в одном случае, а не в другом?

Вывод приложения консоли потребителя:

Hello World!
Consumer created!
Assigned to a Topic!
Ready to Consume!
Listen!
%3|1578587823.362|FAIL|rdkafka#consumer-1| [thrd:10.1.1.139:9092/0]: 10.1.1.139:9092/0: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21014ms in state CONNECT)
%3|1578587826.949|ERROR|rdkafka#consumer-1| [thrd:10.1.1.139:9092/0]: 10.1.1.139:9092/0: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21014ms in state CONNECT)
%3|1578587824.343|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21010ms in state CONNECT)
%3|1578587826.956|ERROR|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21010ms in state CONNECT)

Код приложения консоли потребителя:

Console.WriteLine("Hello World!");
var config = new ConsumerConfig
{
    BootstrapServers = "192.168.68.68:31000",
    // the group.id property must be specified when creating a consumer, even 
    // if you do not intend to use any consumer group functionality.
    GroupId = "ThisisaGroup",
    // partition offsets can be committed to a group even by consumers not
    // subscribed to the group. in this example, auto commit is disabled
    // to prevent this from occurring.
    EnableAutoCommit = true
};

using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
{
    Console.WriteLine("Consumer created!");
    string topicId = "test";
    c.Assign(new TopicPartitionOffset(topicId, 0, Offset.Beginning));
    Console.WriteLine("Assigned to a Topic!");
    CancellationTokenSource cts = new CancellationTokenSource();

    try
    {
        Console.WriteLine($"Ready to Consume!");
        while (!cts.IsCancellationRequested)
        {
            Console.WriteLine($"Listen!");
            try
            {
                var cr = c.Consume(cts.Token);

                if (cr.IsPartitionEOF)
                {
                    Console.WriteLine(
                        $"Reached end of topic {cr.Topic}, partition {cr.Partition}, offset {cr.Offset}.");

                    continue;
                }

                Console.WriteLine($"Consumed message: { cr.Message.Value} its position {cr.TopicPartitionOffset}");
            }
            catch (ConsumeException e)
            {
                Console.WriteLine($"Error occured: {e}");
            }
        }
    }
    catch (OperationCanceledException e)
    {
        // Ensure the consumer leaves the group cleanly and final offsets are committed.
        Console.WriteLine($"Error occured: {e.Data}");
        c.Close();
    }
}

}

Почтовый запрос производителя:

public string PostAsync([FromBody] JObject value)
        {
            var config = new ProducerConfig
            {
                BootstrapServers = "192.168.68.68:31000",
                ClientId = Dns.GetHostName()
            };

            using (var producer = new ProducerBuilder<string, string>(config).Build())
            {
                try
                {
                    producer.Produce("test", new Message<string, string> { Key = "1", Value = value.ToString() });
                    producer.Flush();
                    return "ok!" + producer.Name;
                }
                catch (ProduceException<Null, string> e)
                {
                    return e.ToString();
                }
            }
         }

Получить запросT:

public string Get()
        {
            Console.WriteLine("Hello World!");
            var config = new ConsumerConfig
            {
                BootstrapServers = "192.168.68.68:31000",
                // the group.id property must be specified when creating a consumer, even 
                // if you do not intend to use any consumer group functionality.
                GroupId = "ThisisaGroup",
                // partition offsets can be committed to a group even by consumers not
                // subscribed to the group. in this example, auto commit is disabled
                // to prevent this from occurring.
                EnableAutoCommit = true,
                Debug = "broker"
            };

            using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                Console.WriteLine("Consumer created!");
                string topicId = "test";
                c.Assign(new TopicPartitionOffset(topicId, 0, Offset.Beginning));
                Console.WriteLine("Assigned to a Topic!");
                CancellationTokenSource cts = new CancellationTokenSource();

                try
                {
                    Console.WriteLine($"Ready to Consume!");
                    while (!cts.IsCancellationRequested)
                    {
                        Console.WriteLine($"Listen!");
                        try
                        {
                            var cr = c.Consume(cts.Token);

                            if (cr.IsPartitionEOF)
                            {
                                return 
                                    $"Reached end of topic {cr.Topic}, partition {cr.Partition}, offset {cr.Offset}.";

                                continue;
                            }

                            return $"Consumed message: { cr.Message.Value} its position {cr.TopicPartitionOffset}";
                        }
                        catch (ConsumeException e)
                        {
                            return $"Error occured: {e}";
                        }
                    }
                }
                catch (OperationCanceledException e)
                {
                    // Ensure the consumer leaves the group cleanly and final offsets are committed.
                    Console.WriteLine($"Error occured: {e.Data}");
                    c.Close();
                }
                return "Some";
            }

        }
...