Как исправить ошибку «Компонент: [x] подписывается из несуществующего компонента [y]» в топологии Apache Storm Trident - PullRequest
0 голосов
/ 12 июня 2019

Я только что реализовал функцию DRPC трезубца для обработки входящих сообщений, и я пытаюсь сохранить количество обработанных кортежей на заключительном этапе топологии как состояние Trident.Вот моя топология:

topology.newDRPCStream("portfolio")
    .map(parseMapFunction,
        new Fields("portfolioUrn", "portfolioSourceSystem", "portfolioRegion",
            "portfolioTimestamp", "portfolioPayload"))
    .filter(new FilterNull())
    .flatMap(splitMapFunction,
        new Fields("portfolioUrn", "portfolioSourceSystem", "portfolioRegion",
            "portfolioTimestamp", "strategyCode"))
    .parallelismHint(1)
    .shuffle()
    .each(new Fields("strategyCode"), findMongoTradesFunction,
        new Fields("uitid", "id", "sourceSystem", "sourceTransactionTime", "publicationTime",
            "tradeVersion", "urn", "riskViewFrom", "riskViewTo", "authorized"))
    .parallelismHint(10)
    .shuffle()
    .filter(tradeFilterFunction)
    .parallelismHint(150)
    .groupBy(new Fields("uitid"))
    .aggregate(
        new Fields("portfolioUrn", "portfolioTimestamp", "strategyCode", "id", "sourceSystem",
            "sourceTransactionTime", "publicationTime", "tradeVersion", "urn", "riskViewFrom",
            "riskViewTo", "uitid"), reduceAggregateFunction,
        new Fields("portfolioUrn", "portfolioTimestamp", "strategyCode", "id", "sourceSystem",
            "sourceTransactionTime", "publicationTime", "tradeVersion", "urn", "riskViewFrom",
            "riskViewTo"))
    .parallelismHint(200)
    .groupBy(new Fields("portfolioUrn"))
    .persistentAggregate(stateFactory, new Count(), new Fields("count"));

Когда я пытаюсь отправить эту топологию в Storm, у меня появляется эта ошибка:

Exception in thread "main" java.lang.RuntimeException: InvalidTopologyException(msg:Component: [b-4] subscribes from non-existent component [$mastercoord-bg0])
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159)
at com.citi.tm.portfolio.tps.PortfolioLauncher.main(PortfolioLauncher.java:34)
Caused by: InvalidTopologyException(msg:Component: [b-4] subscribes from non-existent component [$mastercoord-bg0])
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070)
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047)
at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981)
at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306)
at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290)
at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326)
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260)
... 3 more

Я могу успешно отправить топологию, если удаляю последнюю2 функции из этой топологии:

.groupBy(new Fields("portfolioUrn"))
.persistentAggregate(stateFactory, new Count(), new Fields("count"));

После того, как я запустил свою функцию агрегирования (aggregate ()), я хотел бы сгруппировать кортежи с полем portfolioUrn и сохранить счет в mongoDB.Я не понимаю, почему последний раздел groupBy (). PersistentAggregate () вызывает эту ошибку.Не могли бы вы помочь найти причину?

...