Я работаю с кластером Kafka и использую Transactional Producer для потоковой передачи atomi c (чтение-процесс-запись).
// Init Transactions
_transactionalProducer.InitTransactions(DefaultTimeout);
// Begin the transaction
_transactionalProducer.BeginTransaction();
// produce message to one or many topics
var topic = Topics.MyTopic;
_transactionalProducer.Produce(topic, consumeResult.Message);
Я использую AvroSerializer, так как я публикую sh сообщений со схемой.
Produce выдает исключение:
"System.InvalidOperationException: Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.\r\n at Confluent.Kafka.Producer`2.Produce(TopicPartition topicPartition, Message`2 message, Action`1 deliveryHandler)"
Все примеры, которые я видел для транзакционного производителя, используют метод Produce, а не ProduceAsyn c, поэтому не уверен, что могу просто переключитесь на ProduceAsyn c и предположите, что транзакционное производство будет работать правильно. Исправьте меня, если я ошибаюсь, или помогите найти документацию.
В противном случае я не смогу найти AvroSerializer, который не является Asyn c и наследуется от ISerializer.
public class AvroSerializer<T> : IAsyncSerializer<T>