Вы можете сделать это с помощью https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/Testing.java#L376.
Причина, по которой в некоторых случаях это не используется, заключается в том, что для реализации интерфейса CompletableSpout требуется каждый носик в топологии https://github.com/apache/storm/blob/4137328b75c06771f84414c3c2113e2d1c757c08/storm-client/src/jvm/org/apache/storm/testing/CompletableSpout.java.
Большинство носителей Storm никогда не достигают точки, когда они «сделаны» (поскольку это среда потоковой обработки, а не среда пакетной обработки), поэтому невозможно определить, когда топология закончена.Например, если вы потребляете сообщения из темы Kafka, производители могут в любой момент добавить дополнительные сообщения в тему, так как же потребитель определит, что он закончил потреблять?
CompletableSpout существует в основном для облегчения тестированияпотому что тогда носик может сказать, сделано ли это.Метод completeTopology, который я связал, может затем использовать эту дополнительную функцию, чтобы сказать, все ли носители в топологии «сделаны», и может остановить топологию после этого.
Если носик, который вы используете в тесте, не 'Если реализовать CompletableSpout (чего нет у большинства носителей), невозможно определить, когда топология в целом закончена.Во многих случаях вы все еще можете сделать лучше, чем в примере, который вы связали, например, если моя топология должна записывать 10 сообщений в очередь в тесте, я могу завершить тест, как только 10 сообщений будут записаны в очередь.
Что касается потоков Akka, я не очень знаком с ними, но, глядя на вводную документацию, вы можете считать CompletableSpouts похожими на ограниченные источники (например, a Source(1 to 100)
), тогда как «нормальные» носикинеограниченные источники (например, Source.repeat(1)
).