Подождите, пока submitToplogy закончит - PullRequest
0 голосов
/ 22 мая 2018

Я читаю штормовую книгу.Я нашел следующий фрагмент кода в книге

LocalCluster lc = new LocalCluster()
lc.submitTopology("GitHub-commit-count-topology"), config, topology);
Utils.sleep(TEN_MINUTES)
lc.killTopology("GitHub-commit-count-topology")
lc.shutdown()

Так что этот код отправит топологию для выполнения, подождет фиксированные 10 минут, а затем уничтожит топологию.Но это странно.Как мне сказать.submitTopology дождитесь его завершения и завершения.kill и shutdown.

Как и в Akka Streams, мы получаем Future[Done], и мы просто ждем завершения этого будущего.(вместо фиксированных 10 минут).

1 Ответ

0 голосов
/ 22 мая 2018

Вы можете сделать это с помощью https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/Testing.java#L376.

Причина, по которой в некоторых случаях это не используется, заключается в том, что для реализации интерфейса CompletableSpout требуется каждый носик в топологии https://github.com/apache/storm/blob/4137328b75c06771f84414c3c2113e2d1c757c08/storm-client/src/jvm/org/apache/storm/testing/CompletableSpout.java.

Большинство носителей Storm никогда не достигают точки, когда они «сделаны» (поскольку это среда потоковой обработки, а не среда пакетной обработки), поэтому невозможно определить, когда топология закончена.Например, если вы потребляете сообщения из темы Kafka, производители могут в любой момент добавить дополнительные сообщения в тему, так как же потребитель определит, что он закончил потреблять?

CompletableSpout существует в основном для облегчения тестированияпотому что тогда носик может сказать, сделано ли это.Метод completeTopology, который я связал, может затем использовать эту дополнительную функцию, чтобы сказать, все ли носители в топологии «сделаны», и может остановить топологию после этого.

Если носик, который вы используете в тесте, не 'Если реализовать CompletableSpout (чего нет у большинства носителей), невозможно определить, когда топология в целом закончена.Во многих случаях вы все еще можете сделать лучше, чем в примере, который вы связали, например, если моя топология должна записывать 10 сообщений в очередь в тесте, я могу завершить тест, как только 10 сообщений будут записаны в очередь.

Что касается потоков Akka, я не очень знаком с ними, но, глядя на вводную документацию, вы можете считать CompletableSpouts похожими на ограниченные источники (например, a Source(1 to 100)), тогда как «нормальные» носикинеограниченные источники (например, Source.repeat(1)).

...