Storm 1.2.2 и Kafka версии 2.x - PullRequest
0 голосов
/ 23 апреля 2019

Я тестирую случай, используя Storm 1.2.2 и Kafka 2.x в качестве моего Spout. Поэтому я создал LocalCluster только для целей тестирования.

  TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("MYKAFKAIP:9092", "storm-test-dpi").build()), 1);
        builder.setBolt("bolt", new LoggerBolt()).shuffleGrouping("kafka_spout");

        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("kafkaBoltTest", new Config(), builder.createTopology());
        Utils.sleep(10000);

После инициализации этого приложения я получил следующее:

9293 [Thread-20-kafka_spout-executor[3 3]] INFO  o.a.k.c.u.AppInfoParser - Kafka version : 0.10.1.0
9293 [Thread-20-kafka_spout-executor[3 3]] INFO  o.a.k.c.u.AppInfoParser - Kafka commitId : 3402a74efb23d1d4

И после большой ошибки:

9664 [Thread-20-kafka_spout-executor[3 3]] INFO  o.a.s.k.s.KafkaSpout - Initialization complete
9703 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9714 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9742 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9756 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9767 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9781 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9806 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0

Я думаю, что эта проблема из-за версии Kafka, как вы видите, журнал показывает версию "0.10.1.0", но моя версия Kafka - "2.x".

Это мой pom.xml:

 <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${version.storm}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>${version.storm}</version>
        </dependency>

Где ${version.storm} - 1,2.2

1 Ответ

1 голос
/ 23 апреля 2019

Вы также должны объявить версию kafka-clients, которую вы используете.storm-kafka-client POM устанавливает область действия kafka-clients на provided.Это означает, что kafka-clients не будет включен при сборке.Мы делаем это, чтобы вы могли легко обновить.

Причина, по которой он работает даже для вас, заключается в том, что вы используете LocalCluster в некотором тестовом коде, где присутствуют provided зависимости.

Добавьте это кваш POM, и он должен работать:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>your-kafka-version-here</version>
        </dependency>
...