Топология Killing Storm после завершения работы Spout - PullRequest
0 голосов
/ 27 марта 2019

Я создал топологию Storm со Spout, который генерирует несколько кортежей для бенчмаркинга.Я хочу остановить / убить мою топологию, как только все кортежи будут извлечены из носика или в топологии больше не будет никаких кортежей.

Вот как выглядит моя топология.

LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
//Disabled ACK'ing for higher throughput
conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0); 

LoadGeneratorSource loadGenerator = new LoadGeneratorSource(runtime,numberOfTuplesToBeEmitted);
builder.setSpout("loadGenerator", loadGenerator);

//Some Bolts Here

while (loadGenerator.isRunning()){
//Active Waiting
}
//DO SOME STUFF WITH JAVA
cluster.killTopology("StormBenchmarkTopology");

Проблема в том, что экземпляр loadGenerator, на который я ссылаюсь в этой области, отличается от экземпляра, запущенного в потоке носика.Следовательно, isRuning () всегда возвращает true, хотя в потоке spout его значение равно false, если больше нет кортежей для отправки.

Вот часть класса LoadGeneratorSource.


public class LoadGeneratorSource extends BaseRichSpout {

    private final int throughput;
    private boolean running;
    private final long runtime;


    public LoadGeneratorSource(long runtime,int throughput) {
        this.throughput = throughput;
        this.runtime = runtime;
    }

    @Override
    public void nextTuple() {
        ThroughputStatistics.getInstance().pause(false);

        long endTime = System.currentTimeMillis() + runtime;
        while (running) {
            long startTs = System.currentTimeMillis();

            for (int i = 0; i < throughput; i++) {
                try {
                    emitValue(readNextTuple());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            while (System.currentTimeMillis() < startTs + 1000) {
                // active waiting
            }

            if (endTime <= System.currentTimeMillis())
                setRunning(false);
        }
    }

    public boolean isRunning() {
        return running;
    }

    public void setRunning(boolean running) {
        this.running = running;
    }

    //MORE STUFF

}

Может кто-нибудь сказать мне способ остановить мою топологию, если больше нет кортежей, испускаемых из потока или вытекающих в топологии?Заранее спасибо за помощь.

1 Ответ

0 голосов
/ 27 марта 2019

Это похоже на дубликат Топология смертельного шторма из носика . Пожалуйста, попробуйте ответ, данный там.

Просто чтобы дать краткое резюме; То, как вы пытаетесь это сделать, не сработает, но вы можете использовать NimbusClient из носика, чтобы попросить Nimbus уничтожить вашу топологию. Дополнительным преимуществом является то, что оно также будет работать после развертывания в реальном кластере.

...