Отправка байтового массива в кафку .Net - PullRequest
0 голосов
/ 21 мая 2018

У меня сейчас проблема с кодом, который я написал для Кафки.Код довольно прост, источник производит байтовый массив, а потребитель хочет использовать массив.Проблема в том, что он не отображает никаких ошибок, и я не могу получить требуемый вывод.Я уверен, что входные данные byteArray (Size: 4.06MB) получают правильные значения для передачи в конвейер kafka.Оба метода Consumer не дают требуемого вывода.В методе 1 код не передает условие IF «Consumer.Consume ()», а в методе 2 переменная «msg» принимает значение null.Посоветуйте пожалуйста что я делаю не так.

    Kafka Producer:
        public class BookingProducer : IBookingProducer
            {
                public void Produce(byte[] byteArray)
                {
                    var config = new Dictionary<string, object>
                    {
                        { "bootstrap.servers", "localhost:9092"},
                        { "message.max.bytes", "5242880"}
                    };

                    using (var producer = new Producer<Null, byte[]>(config, null, new ByteArraySerializer()))
                    {
                        var result = producer.ProduceAsync("timemanagement_booking", null, byteArray).GetAwaiter().GetResult();
                        //Console.WriteLine($"Value: {result.Value}");
                        producer.Flush(10000);
                    }
                }
            }

    Kafka Consumer Method 1:
    public class BookingConsumer
        {
            public void Listen()
            {
                var config = new Dictionary<string, object>
                {
                    { "group.id","booking_consumer" },
                    { "bootstrap.servers", "localhost:9092" },
                    { "enable.auto.commit", "false" },
                    { "fetch.message.max.bytes", "5242880" }
                };

                using (var consumer = new Consumer<Null, byte[]>(config, null, new ByteArrayDeserializer()))
                {
                    consumer.Subscribe("timemanagement_booking");

                    while (true)
                    {
                        Message<Null, byte[]> msg;
                        consumer.Poll(10000);
                        if (consumer.Consume(out msg, TimeSpan.FromSeconds(1)))
                        {
                            Console.WriteLine(msg.Value.ToArray());
                        }
                    }
                }
            }
        }
Kafka Consumer Method 2:
public class BookingConsumer
    {
        public void Listen()
        {
            var config = new Dictionary<string, object>
            {
                { "group.id","booking_consumer" },
                { "bootstrap.servers", "localhost:9092" },
                { "enable.auto.commit", "false" },
                { "fetch.message.max.bytes", "5242880" }
            };

            using (var consumer = new Consumer<Null, byte[]>(config, null, new ByteArrayDeserializer()))
            {
                consumer.Subscribe("timemanagement_booking");
                consumer.OnMessage += (_, msg) =>
                {
                    if (msg != null)
                    {
                        Console.WriteLine(msg.Value.ToArray());
                    }
                };
                while (true)
                {
                    consumer.Poll(10000);
                }
            }
        }
    }
...