Кафка потребляет сообщение и потом выдает в другую тему - PullRequest
1 голос
/ 15 апреля 2020

Я должен получить из топика Кафки c, получить сообщение и выполнить некоторую json работу по очистке и фильтрации, затем мне нужно создать новое сообщение для другой топики Кафки c, мой код такой :

public static YamlMappingNode configs;
        public static void Main(string[] args)
        {
            using (var reader = new StreamReader(Path.Combine(Directory.GetCurrentDirectory(), ".gitlab-ci.yml")))
            {
                var yaml = new YamlStream();
                yaml.Load(reader);

                //find variables 
                configs = (YamlMappingNode)yaml.Documents[0].RootNode;
                configs = (YamlMappingNode)configs.Children.Where(k => k.Key.ToString() == "variables")?.FirstOrDefault().Value;

            }

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            }; 
            Run_ManualAssign(configs, cts.Token);
        }

 public static async void Run_ManualAssign(YamlMappingNode configs, CancellationToken cancellationToken)
        {
            var brokerList = configs.Where(k => k.Key.ToString() == "kfk_broker")?.FirstOrDefault().Value.ToString();
            var topics = configs.Where(k => k.Key.ToString() == "input_kfk_topic")?.FirstOrDefault().Value.ToString();
            var config = new ConsumerConfig
            {
                // the group.id property must be specified when creating a consumer, even 
                // if you do not intend to use any consumer group functionality.
                GroupId = new Guid().ToString(),
                BootstrapServers = brokerList,
                // partition offsets can be committed to a group even by consumers not
                // subscribed to the group. in this example, auto commit is disabled
                // to prevent this from occurring.
                EnableAutoCommit = true
            };

            using (var consumer =
                new ConsumerBuilder<Ignore, string>(config)
                    .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                    .Build())
            {
                //consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 0, Offset.Beginning)).ToList());
                consumer.Assign(new TopicPartitionOffset(topics, 0, Offset.End));
                //var producer = new ProducerBuilder<Null, string>(config).Build();
                try
                {
                    while (true)
                    {
                        try
                        {
                            var consumeResult = consumer.Consume(cancellationToken);
                            /// Note: End of partition notification has not been enabled, so
                            /// it is guaranteed that the ConsumeResult instance corresponds
                            /// to a Message, and not a PartitionEOF event.

                            //filter message  
                            var result = ReadMessage(configs, consumeResult.Message.Value);
                            //send to kafka topic
                            await Run_ProducerAsync(configs, result);
                        }
                        catch (ConsumeException e)
                        {

                            Console.WriteLine($"Consume error: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    Console.WriteLine("Closing consumer.");
                    consumer.Close();
                }
            }
        }
        #endregion

        #region Run_Producer
        public static async Task Run_ProducerAsync(YamlMappingNode configs, string message)
        {
            var brokerList = configs.Where(k => k.Key.ToString() == "kfk_broker")?.FirstOrDefault().Value.ToString();
            var topicName = configs.Where(k => k.Key.ToString() == "target_kafka_topic")?.FirstOrDefault().Value.ToString();
            var config = new ProducerConfig {
                BootstrapServers = brokerList,
            };

            using (var producer = new ProducerBuilder<Null, string>(config).Build())
            {
                try
                {
                    /// Note: Awaiting the asynchronous produce request below prevents flow of execution
                    /// from proceeding until the acknowledgement from the broker is received (at the 
                    /// expense of low throughput).
                    var deliveryReport = await producer.ProduceAsync(topicName, new Message<Null, string> { Value = message });
                    producer.Flush(TimeSpan.FromSeconds(10));
                    Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");
                }
                catch (ProduceException<string, string> e)
                {
                    Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
                }
            }
        }
        #endregion

Я что-то здесь не так делаю? Программа существовала сразу при выполнении var deliveryReport = await producer.ProduceAsync(topicName, new Message<Null, string> { Value = message });, ни сообщения об ошибке, ни кода ошибки.

В то же время я использовал Python и настроил то же самое для Producer, это хорошо работает.

Ответы [ 2 ]

0 голосов
/ 21 апреля 2020

Я решил эту проблему, но не знаю почему. Я открыл выпуск здесь.

0 голосов
/ 20 апреля 2020

Run_ManualAssign(configs, cts.Token);

Для этой строки в функции Main вы вызываете asyn c без ожидания в функции syn c. Таким образом, программа завершается сразу после запуска этого вызова (не завершена, поскольку она асинхронная c)

Вы можете иметь 2 варианта

  1. Использовать асинхронную c Основная функция и добавьте ожидание перед этим вызовом.

    https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/proposals/csharp-7.1/async-main

  2. Если вы действительно хотите вызвать асинхронную c функцию в синхронном режиме c функцию

    Run_ManualAssign(configs, ts.Token).ConfigureAwait(false).GetAwaiter().GetResult();

...