Не могли бы вы попытаться также поместить артефакт org.apache.kafka:kafka-clients
в ваши зависимости в той же версии, что и kafka_2.11
?
Что касается использования storm-kafka-client, документацию на странице Storm можно найти по адресуhttps://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka-client.html и примеры на https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java
В частности, вы хотите RecordTranslator
.
ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"), TOPIC_0_1_STREAM);
trans.forTopic(TOPIC_2,
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"), TOPIC_2_STREAM);
return KafkaSpoutConfig.builder(bootstrapServers, new String[]{TOPIC_0, TOPIC_1, TOPIC_2})
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
.setRetry(getRetryService())
.setRecordTranslator(trans)
.setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(EARLIEST)
.setMaxUncommittedOffsets(250)
.build();
Этот пример, например, выведет тему, раздел, смещение,ключ и значение из каждой записи в перечисленных полях, и будет отправлять кортежи из TOPIC_2 в поток, отличный от других подписанных тем.Если вам не нужны разные схемы для разных тем, вы можете использовать SimpleRecordTranslator
.