В настоящее время я работаю с примером для потребителя Producer, использующим кластер kubernetes для запуска установки kafka-zookeeper. Мой продюсер создан как RestAPI, может с помощью почтового запроса отправить строку в очередь в kafka,
Мой потребитель - consoleapp, который некоторое время истинно l oop продолжает прослушивать новые сообщения в определенном топи c.
Кажется, мой продюсер легко может отправить sh сообщение в очередь, но мой потребитель, кажется, может потреблять сообщения? Только если я сделаю это как запрос GET, смогу ли я получить сообщения из очереди?
Я получаю сообщение об ошибке, в котором говорится, что по какой-то причине невозможно установить соединение?
Я не уверен, что понимаю, что тип приложения должен делать здесь? Почему это работает в одном случае, а не в другом?
Вывод приложения консоли потребителя:
Hello World!
Consumer created!
Assigned to a Topic!
Ready to Consume!
Listen!
%3|1578587823.362|FAIL|rdkafka#consumer-1| [thrd:10.1.1.139:9092/0]: 10.1.1.139:9092/0: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21014ms in state CONNECT)
%3|1578587826.949|ERROR|rdkafka#consumer-1| [thrd:10.1.1.139:9092/0]: 10.1.1.139:9092/0: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21014ms in state CONNECT)
%3|1578587824.343|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21010ms in state CONNECT)
%3|1578587826.956|ERROR|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21010ms in state CONNECT)
Код приложения консоли потребителя:
Console.WriteLine("Hello World!");
var config = new ConsumerConfig
{
BootstrapServers = "192.168.68.68:31000",
// the group.id property must be specified when creating a consumer, even
// if you do not intend to use any consumer group functionality.
GroupId = "ThisisaGroup",
// partition offsets can be committed to a group even by consumers not
// subscribed to the group. in this example, auto commit is disabled
// to prevent this from occurring.
EnableAutoCommit = true
};
using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
{
Console.WriteLine("Consumer created!");
string topicId = "test";
c.Assign(new TopicPartitionOffset(topicId, 0, Offset.Beginning));
Console.WriteLine("Assigned to a Topic!");
CancellationTokenSource cts = new CancellationTokenSource();
try
{
Console.WriteLine($"Ready to Consume!");
while (!cts.IsCancellationRequested)
{
Console.WriteLine($"Listen!");
try
{
var cr = c.Consume(cts.Token);
if (cr.IsPartitionEOF)
{
Console.WriteLine(
$"Reached end of topic {cr.Topic}, partition {cr.Partition}, offset {cr.Offset}.");
continue;
}
Console.WriteLine($"Consumed message: { cr.Message.Value} its position {cr.TopicPartitionOffset}");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e}");
}
}
}
catch (OperationCanceledException e)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
Console.WriteLine($"Error occured: {e.Data}");
c.Close();
}
}
}
Почтовый запрос производителя:
public string PostAsync([FromBody] JObject value)
{
var config = new ProducerConfig
{
BootstrapServers = "192.168.68.68:31000",
ClientId = Dns.GetHostName()
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
try
{
producer.Produce("test", new Message<string, string> { Key = "1", Value = value.ToString() });
producer.Flush();
return "ok!" + producer.Name;
}
catch (ProduceException<Null, string> e)
{
return e.ToString();
}
}
}
Получить запросT:
public string Get()
{
Console.WriteLine("Hello World!");
var config = new ConsumerConfig
{
BootstrapServers = "192.168.68.68:31000",
// the group.id property must be specified when creating a consumer, even
// if you do not intend to use any consumer group functionality.
GroupId = "ThisisaGroup",
// partition offsets can be committed to a group even by consumers not
// subscribed to the group. in this example, auto commit is disabled
// to prevent this from occurring.
EnableAutoCommit = true,
Debug = "broker"
};
using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
{
Console.WriteLine("Consumer created!");
string topicId = "test";
c.Assign(new TopicPartitionOffset(topicId, 0, Offset.Beginning));
Console.WriteLine("Assigned to a Topic!");
CancellationTokenSource cts = new CancellationTokenSource();
try
{
Console.WriteLine($"Ready to Consume!");
while (!cts.IsCancellationRequested)
{
Console.WriteLine($"Listen!");
try
{
var cr = c.Consume(cts.Token);
if (cr.IsPartitionEOF)
{
return
$"Reached end of topic {cr.Topic}, partition {cr.Partition}, offset {cr.Offset}.";
continue;
}
return $"Consumed message: { cr.Message.Value} its position {cr.TopicPartitionOffset}";
}
catch (ConsumeException e)
{
return $"Error occured: {e}";
}
}
}
catch (OperationCanceledException e)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
Console.WriteLine($"Error occured: {e.Data}");
c.Close();
}
return "Some";
}
}