У меня сейчас проблема с кодом, который я написал для Кафки.Код довольно прост, источник производит байтовый массив, а потребитель хочет использовать массив.Проблема в том, что он не отображает никаких ошибок, и я не могу получить требуемый вывод.Я уверен, что входные данные byteArray (Size: 4.06MB) получают правильные значения для передачи в конвейер kafka.Оба метода Consumer не дают требуемого вывода.В методе 1 код не передает условие IF «Consumer.Consume ()», а в методе 2 переменная «msg» принимает значение null.Посоветуйте пожалуйста что я делаю не так.
Kafka Producer:
public class BookingProducer : IBookingProducer
{
public void Produce(byte[] byteArray)
{
var config = new Dictionary<string, object>
{
{ "bootstrap.servers", "localhost:9092"},
{ "message.max.bytes", "5242880"}
};
using (var producer = new Producer<Null, byte[]>(config, null, new ByteArraySerializer()))
{
var result = producer.ProduceAsync("timemanagement_booking", null, byteArray).GetAwaiter().GetResult();
//Console.WriteLine($"Value: {result.Value}");
producer.Flush(10000);
}
}
}
Kafka Consumer Method 1:
public class BookingConsumer
{
public void Listen()
{
var config = new Dictionary<string, object>
{
{ "group.id","booking_consumer" },
{ "bootstrap.servers", "localhost:9092" },
{ "enable.auto.commit", "false" },
{ "fetch.message.max.bytes", "5242880" }
};
using (var consumer = new Consumer<Null, byte[]>(config, null, new ByteArrayDeserializer()))
{
consumer.Subscribe("timemanagement_booking");
while (true)
{
Message<Null, byte[]> msg;
consumer.Poll(10000);
if (consumer.Consume(out msg, TimeSpan.FromSeconds(1)))
{
Console.WriteLine(msg.Value.ToArray());
}
}
}
}
}
Kafka Consumer Method 2:
public class BookingConsumer
{
public void Listen()
{
var config = new Dictionary<string, object>
{
{ "group.id","booking_consumer" },
{ "bootstrap.servers", "localhost:9092" },
{ "enable.auto.commit", "false" },
{ "fetch.message.max.bytes", "5242880" }
};
using (var consumer = new Consumer<Null, byte[]>(config, null, new ByteArrayDeserializer()))
{
consumer.Subscribe("timemanagement_booking");
consumer.OnMessage += (_, msg) =>
{
if (msg != null)
{
Console.WriteLine(msg.Value.ToArray());
}
};
while (true)
{
consumer.Poll(10000);
}
}
}
}