Проблема Apache Storm версии 2.0 при локальном запуске топологии с maven - PullRequest
0 голосов
/ 03 июля 2019

Я пытаюсь запустить новую версию Storm (v2.0), но по какой-то причине при локальном запуске все мои выходные комментарии не отображаются в консоли, что приводит меня к выводу, что топология шторма не выполняется ввсе.Вывод из Storm огромен, и я не вижу, в чем проблема.

Это содержимое моего файла pom.xml:

<groupId>com.example.exclamationTopologyEx</groupId>
<artifactId>ExclamationTopologtEx</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-client</artifactId>
        <version>2.0.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-server</artifactId>
        <version>2.0.0</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>exec-maven-plugin</artifactId>
            <version>1.5.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>exec</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <executable>java</executable>
                <includeProjectDependencies>true</includeProjectDependencies>
                <includePluginDependencies>false</includePluginDependencies>
                <classpathScope>compile</classpathScope>
                <mainClass>com.stormExample.SquareStormTopology</mainClass>
                <cleanupDaemonThreads>false</cleanupDaemonThreads>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

Это основной класс для запуска топологии:

public static void main(String[] args) throws Exception {
    //used to build the toplogy
    TopologyBuilder builder = new TopologyBuilder();
    //add the spout with name 'spout' and parallelism hint of 5 executors
    builder.setSpout("spout", new DataSpout(),5);
    //add the Emitter bolt with the name 'emitter'
    builder.setBolt("emitter",new EmitterBolt(),8).shuffleGrouping("spout");

    Config conf = new Config();
    //set to false to disable debug when running on production cluster
    conf.setDebug(false);
    //if there are arguments then we are running on a cluster
    if(args!= null && args.length > 0){
        //parallelism hint to set the number of workers
        conf.setNumWorkers(3);
        //submit the toplogy
        StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }else{//we are running locally
        //the maximum number of executors
        conf.setMaxTaskParallelism(3);

        //local cluster used to run locally
        LocalCluster cluster = new LocalCluster();
        //submitting the topology
        cluster.submitTopology("emitter-topology",conf, builder.createTopology());
        //sleep
        Thread.sleep(10000);
        //shut down the cluster
        cluster.shutdown();
    }
}

Кроме того, это мойреализации spout и bolt:

public class DataSpout extends BaseRichSpout {

    SpoutOutputCollector _collector;
    int nextNumber;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        _collector = spoutOutputCollector;
        nextNumber = 2;
    }

    @Override
    public void nextTuple() {
        if(nextNumber > 20){
            return;
        }else{
            _collector.emit(new Values(nextNumber));

            nextNumber = nextNumber + 2;
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("number"));
    }
}
public class EmitterBolt extends BaseBasicBolt {

    int sumNumbers;

    public EmitterBolt(){

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        sumNumbers = 0;
        System.out.println("com.example.exclamationTopologyEx.EmitterBolt has been initialized");
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        int nextNumber = tuple.getIntegerByField("number");

        sumNumbers+=nextNumber;

        System.out.println("The new sum is: " + sumNumbers);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

Я использую команду maven:

mvn compile exec:java -Dstorm.topology=com.example.exclamationTopologyEx 

для запуска топологии, но, как я сказал, ни одна из строк печати не отображается в консоли.Я пытался найти решение по документации как на веб-сайте, так и в git, но не смог найти ничего, что могло бы заставить эту простую топологию работать.

Я думаю, что проблема может быть в файле pom.xml , мне могут потребоваться некоторые другие зависимости, или у меня есть некоторые неправильные зависимости, но, как я уже сказал, я не могу понять, чтоправильный контент для pom.xml.

Я работаю в Mac OS и использую последнюю версию Java (12.0.1), а в качестве IDE я использую IntelliJ IDEA.

...