Я отправляю данные, используя метод AvroSpecifi c или AvroGeneri c, и он работает нормально. Проблема возникает, когда я пытаюсь использовать данные, отправленные вышеуказанными методами.
Я использую библиотеки версий:
- Confluent.Kafka v1.4.0
- Confluent.SchemaRegistry v1.4.0
- Confluent.SchemaRegistry.Serdes v1.3.0
Вот мой код потребителя:
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = schemaRegistryUrl
};
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
using (var consumer =
new ConsumerBuilder<Ignore, GenericRecord>(consumerConfig)
.SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
.Build())
{
consumer.Subscribe(topicName);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(token);
//do something
}
catch (ConsumeException ex)
{
logger.LogError($"Error occured: {ex.Error.Reason}", ex);
}
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}