KafkaStreams Как создать экземпляр ConsumerRecordFactory? - PullRequest
0 голосов
/ 15 ноября 2018

Я пытаюсь использовать ConsumerRecordFactory, предоставленный Kafka Streams, следуя главным образом confluent doc , чтобы протестировать потоковое приложение, вот код, который у меня есть:

// Properties of the application
Properties streamsConfiguration = new Properties();

// Give the Streams application a unique name.  The name must be unique in the Kafka cluster
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing_application");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyserver:2181");

// Create the topology builder
StreamsBuilder builder = new StreamsBuilder();

// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);

// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
        "input-topic",
        new StringSerializer(),
        new IntegerSerializer()
);

// Create a test record
ConsumerRecordFactory<byte[], byte[]> record = factory.create("key", 42L);

Моя проблема в том, что когда я компилирую свой код, я получаю следующую ошибку:

Error:(70, 52) java: reference to create is ambiguous
  both method create(K,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory
  and method create(java.lang.String,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory match

Итак, я понимаю, что потоки kafka определяют общий метод create(K,V,long) и что, когда я создаю свою фабрику с неуниверсальными типами, я создаю новый метод, который находится в конфликте с первым.

У меня вопрос, как я должен создать свою ConsumerRecordFactory?

Я попытался сделать свой завод более общим с ConsumerRecordFactory<Object, Integer>, но тогда выведенный тип не совпадает. И я не могу найти другой пример, что Confluent github repo kafka-streams-examples , похоже, не использует ConsumerRecordFactory, а этот SO-ответ , похоже, использует тот же код в документации.

(я знаю, что проблема скорее в java, чем в потоках kafka, но я подумал, что пометить его apache-kafka-streams - это хороший способ привлечь людей, привыкших к ConsumerRecordFactory)

1 Ответ

0 голосов
/ 15 ноября 2018

В приведенном ниже коде есть некоторые проблемы:

// Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
        "input-topic",
        new StringSerializer(),
        new IntegerSerializer() );

// Create a test record
ConsumerRecordFactory<byte[], byte[]> record = factory.create("key", 42L);
  1. Вы определили valueType как Integer в ConsumerRecordFactory, но в методе create() вы передаете Long значение типа.
  2. factory.create() возвращает ConsumerRecord вместо ConsumerRecordFactory.

Что касается неоднозначности метода, вы правы. Поэтому, чтобы избежать этой проблемы, используйте следующее:

ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>( 
        new StringSerializer(),
        new IntegerSerializer()
);
// Use ConsumerRecord here instead of ConsumerRecordFactory
ConsumerRecord<byte[], byte[]> record = factory.create("input-topic","key", 42);
...