Я создал топологию Storm со Spout, который генерирует несколько кортежей для бенчмаркинга.Я хочу остановить / убить мою топологию, как только все кортежи будут извлечены из носика или в топологии больше не будет никаких кортежей.
Вот как выглядит моя топология.
LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
//Disabled ACK'ing for higher throughput
conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
LoadGeneratorSource loadGenerator = new LoadGeneratorSource(runtime,numberOfTuplesToBeEmitted);
builder.setSpout("loadGenerator", loadGenerator);
//Some Bolts Here
while (loadGenerator.isRunning()){
//Active Waiting
}
//DO SOME STUFF WITH JAVA
cluster.killTopology("StormBenchmarkTopology");
Проблема в том, что экземпляр loadGenerator, на который я ссылаюсь в этой области, отличается от экземпляра, запущенного в потоке носика.Следовательно, isRuning () всегда возвращает true, хотя в потоке spout его значение равно false, если больше нет кортежей для отправки.
Вот часть класса LoadGeneratorSource.
public class LoadGeneratorSource extends BaseRichSpout {
private final int throughput;
private boolean running;
private final long runtime;
public LoadGeneratorSource(long runtime,int throughput) {
this.throughput = throughput;
this.runtime = runtime;
}
@Override
public void nextTuple() {
ThroughputStatistics.getInstance().pause(false);
long endTime = System.currentTimeMillis() + runtime;
while (running) {
long startTs = System.currentTimeMillis();
for (int i = 0; i < throughput; i++) {
try {
emitValue(readNextTuple());
} catch (Exception e) {
e.printStackTrace();
}
}
while (System.currentTimeMillis() < startTs + 1000) {
// active waiting
}
if (endTime <= System.currentTimeMillis())
setRunning(false);
}
}
public boolean isRunning() {
return running;
}
public void setRunning(boolean running) {
this.running = running;
}
//MORE STUFF
}
Может кто-нибудь сказать мне способ остановить мою топологию, если больше нет кортежей, испускаемых из потока или вытекающих в топологии?Заранее спасибо за помощь.