У меня есть топология Storm, которая читает с удаленного сервера, а затем обрабатывает текст в его болтах.
Я хочу деактивировать / отключить носик, когда он обнаружит, что для чтения не осталось данных, а болтам оставалось время завершить их обработку.
Я пытался использовать метод deactivate () в носике, вызывая его, когда обнаруживает, что для чтения не осталось данных. Я не мог понять, как заставить это работать (не мог найти много документации об этом).
Я решил продолжить с вызова метода, который я написал ниже в носике, когда обнаружил, что не осталось данных для чтения. Я использовал KillOptions при закрытии топологии, чтобы указать некоторое время ожидания, но оно продолжает уничтожать его, как только метод вызывается без ожидания.
private void endReading() {
Map conf = Utils.readStormConfig();
conf.put("nimbus.seeds", "localhost");
NimbusClient cc = NimbusClient.getConfiguredClient(conf);
Nimbus.Client client = cc.getClient();
try {
KillOptions ko = new KillOptions();
ko.set_wait_secs(600);
ko.set_wait_secs_isSet(true);
client.killTopologyWithOpts("local-topology", ko);
} catch (TException e) {
e.printStackTrace();
}
}
Я неправильно использую KillOptions? Есть ли более простой способ закрыть излив, давая время для завершения болтов? Любой совет будет оценен.
РЕДАКТИРОВАТЬ: я запускаю это локально, что является причиной, почему это не сработало, как объяснено в продолжение ответа.