Я должен получить из топика Кафки c, получить сообщение и выполнить некоторую json работу по очистке и фильтрации, затем мне нужно создать новое сообщение для другой топики Кафки c, мой код такой :
public static YamlMappingNode configs;
public static void Main(string[] args)
{
using (var reader = new StreamReader(Path.Combine(Directory.GetCurrentDirectory(), ".gitlab-ci.yml")))
{
var yaml = new YamlStream();
yaml.Load(reader);
//find variables
configs = (YamlMappingNode)yaml.Documents[0].RootNode;
configs = (YamlMappingNode)configs.Children.Where(k => k.Key.ToString() == "variables")?.FirstOrDefault().Value;
}
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
Run_ManualAssign(configs, cts.Token);
}
public static async void Run_ManualAssign(YamlMappingNode configs, CancellationToken cancellationToken)
{
var brokerList = configs.Where(k => k.Key.ToString() == "kfk_broker")?.FirstOrDefault().Value.ToString();
var topics = configs.Where(k => k.Key.ToString() == "input_kfk_topic")?.FirstOrDefault().Value.ToString();
var config = new ConsumerConfig
{
// the group.id property must be specified when creating a consumer, even
// if you do not intend to use any consumer group functionality.
GroupId = new Guid().ToString(),
BootstrapServers = brokerList,
// partition offsets can be committed to a group even by consumers not
// subscribed to the group. in this example, auto commit is disabled
// to prevent this from occurring.
EnableAutoCommit = true
};
using (var consumer =
new ConsumerBuilder<Ignore, string>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build())
{
//consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 0, Offset.Beginning)).ToList());
consumer.Assign(new TopicPartitionOffset(topics, 0, Offset.End));
//var producer = new ProducerBuilder<Null, string>(config).Build();
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
/// Note: End of partition notification has not been enabled, so
/// it is guaranteed that the ConsumeResult instance corresponds
/// to a Message, and not a PartitionEOF event.
//filter message
var result = ReadMessage(configs, consumeResult.Message.Value);
//send to kafka topic
await Run_ProducerAsync(configs, result);
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Closing consumer.");
consumer.Close();
}
}
}
#endregion
#region Run_Producer
public static async Task Run_ProducerAsync(YamlMappingNode configs, string message)
{
var brokerList = configs.Where(k => k.Key.ToString() == "kfk_broker")?.FirstOrDefault().Value.ToString();
var topicName = configs.Where(k => k.Key.ToString() == "target_kafka_topic")?.FirstOrDefault().Value.ToString();
var config = new ProducerConfig {
BootstrapServers = brokerList,
};
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
try
{
/// Note: Awaiting the asynchronous produce request below prevents flow of execution
/// from proceeding until the acknowledgement from the broker is received (at the
/// expense of low throughput).
var deliveryReport = await producer.ProduceAsync(topicName, new Message<Null, string> { Value = message });
producer.Flush(TimeSpan.FromSeconds(10));
Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
}
}
}
#endregion
Я что-то здесь не так делаю? Программа существовала сразу при выполнении var deliveryReport = await producer.ProduceAsync(topicName, new Message<Null, string> { Value = message });
, ни сообщения об ошибке, ни кода ошибки.
В то же время я использовал Python и настроил то же самое для Producer
, это хорошо работает.