Исключение в KafkaSpout - PullRequest
       19

Исключение в KafkaSpout

0 голосов
/ 27 августа 2018

Я получаю следующую исключительную топологию шторма.

java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V
    at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:41) ~[stormjar.jar:?]
    at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:44) ~[stormjar.jar:?]
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) ~[stormjar.jar:?]
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) ~[stormjar.jar:?]
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?]
    at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?]
    at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?]
    at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:81) ~[stormjar.jar:?]
    at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:71) ~[stormjar.jar:?]
    at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:135) ~[stormjar.jar:?]
    at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:110) ~[stormjar.jar:?]
    at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:71) ~[stormjar.jar:?]
    at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[stormjar.jar:?]
    at org.apache.storm.daemon.executor$fn__10727$fn__10742$fn__10773.invoke(executor.clj:654) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]

Конфигурация POM:

<dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <!-- <version>0.10.0</version> -->
            <version>1.2.2</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>log4j-core</artifactId>
                    <groupId>org.apache.logging.log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j-api</artifactId>
                    <groupId>org.apache.logging.log4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <!-- <version>0.10.0</version> -->
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.0.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

Я использую библиотеку storm-kafka, которая устарела . Если это является причиной вышеупомянутого исключения, тогда дайте мне знать, как создать носик kafka, используя библиотеку storm-kafka-client, и передать ей собственную схему .

Спасибо.

1 Ответ

0 голосов
/ 27 августа 2018

Не могли бы вы попытаться также поместить артефакт org.apache.kafka:kafka-clients в ваши зависимости в той же версии, что и kafka_2.11?

Что касается использования storm-kafka-client, документацию на странице Storm можно найти по адресуhttps://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka-client.html и примеры на https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java

В частности, вы хотите RecordTranslator.

ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
            (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
            new Fields("topic", "partition", "offset", "key", "value"), TOPIC_0_1_STREAM);
        trans.forTopic(TOPIC_2,
            (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
            new Fields("topic", "partition", "offset", "key", "value"), TOPIC_2_STREAM);
        return KafkaSpoutConfig.builder(bootstrapServers, new String[]{TOPIC_0, TOPIC_1, TOPIC_2})
            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
            .setRetry(getRetryService())
            .setRecordTranslator(trans)
            .setOffsetCommitPeriodMs(10_000)
            .setFirstPollOffsetStrategy(EARLIEST)
            .setMaxUncommittedOffsets(250)
.build();

Этот пример, например, выведет тему, раздел, смещение,ключ и значение из каждой записи в перечисленных полях, и будет отправлять кортежи из TOPIC_2 в поток, отличный от других подписанных тем.Если вам не нужны разные схемы для разных тем, вы можете использовать SimpleRecordTranslator.

...