Produce вызывается с настроенным сериализатором значений IAsyncSerializer, но ISerializer требуется при использовании Avro Serializer - PullRequest
0 голосов
/ 18 июня 2020

Я работаю с кластером Kafka и использую Transactional Producer для потоковой передачи atomi c (чтение-процесс-запись).

                // Init Transactions
                _transactionalProducer.InitTransactions(DefaultTimeout);

                // Begin the transaction
                _transactionalProducer.BeginTransaction();

                // produce message to one or many topics
                var topic = Topics.MyTopic;
                _transactionalProducer.Produce(topic, consumeResult.Message);

Я использую AvroSerializer, так как я публикую sh сообщений со схемой.

Produce выдает исключение:

"System.InvalidOperationException: Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.\r\n   at Confluent.Kafka.Producer`2.Produce(TopicPartition topicPartition, Message`2 message, Action`1 deliveryHandler)"

Все примеры, которые я видел для транзакционного производителя, используют метод Produce, а не ProduceAsyn c, поэтому не уверен, что могу просто переключитесь на ProduceAsyn c и предположите, что транзакционное производство будет работать правильно. Исправьте меня, если я ошибаюсь, или помогите найти документацию.

В противном случае я не смогу найти AvroSerializer, который не является Asyn c и наследуется от ISerializer.

public class AvroSerializer<T> : IAsyncSerializer<T>

1 Ответ

0 голосов
/ 21 июля 2020

Я не осознавал, что существует метод AsSyncOverAsync, который я могу использовать при создании сериализатора. Это существует потому, что Kafka Consumer также по-прежнему Syn c, а не Asyn c.

Например:

new AvroSerializer<TValue>(schemaRegistryClient, serializerConfig).AsSyncOverAsync();

Вот документация Confluent по этому методу.

        //
        // Summary:
        //     Create a sync serializer by wrapping an async one. For more information on the
        //     potential pitfalls in doing this, refer to Confluent.Kafka.SyncOverAsync.SyncOverAsyncSerializer`1.
        public static ISerializer<T> AsSyncOverAsync<T>(this IAsyncSerializer<T> asyncSerializer);
...