InvalidTopologyException (msg: Компонент: [x] подписывается из несуществующего потока [y] - PullRequest
0 голосов
/ 21 января 2020

Я пытаюсь прочитать данные из Кафки и вставить в Кассандру, используя шторм. Я также настроил топологию, но у меня возникли некоторые проблемы, и я понятия не имею, почему это происходит.

Вот мой отправитель.

        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout", new KafkaSpout(spoutConfig));
        topologyBuilder.setBolt("checkingbolt", new CheckingBolt("cassandraBoltStream")).shuffleGrouping("spout");
        topologyBuilder.setBolt("cassandrabolt", new CassandraInsertBolt()).shuffleGrouping("checkingbolt"); 

Здесь, если я прокомментирую последнюю строку, я не вижу никаких исключений. В последней строке я получаю следующее сообщение об ошибке:

InvalidTopologyException(msg:Component: [cassandrabolt] subscribes from non-existent stream: [default] of component [checkingbolt])

Может кто-нибудь, пожалуйста, помогите мне, что здесь не так?

Вот выход outputFieldDeclarer в CheckingBolt

public void declareOutputFields(OutputFieldsDeclarer ofd) {
    ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));
}

У меня нет ничего в методе DeclareOutputFields для CassandraInsertBolt, так как этот болт не излучает никаких значений.

TIA

1 Ответ

1 голос
/ 21 января 2020

Проблема здесь в том, что вы смешиваете имена потоков и имена компонентов (то есть носик / болт). Имена компонентов используются для ссылки на разные болты, в то время как имена потоков используются для ссылки на разные потоки, выходящие из одного болта. Например, если у вас есть болт с именем «evenOrOddBolt», он может испускать два потока: «четный» поток и и «нечетный» поток. Однако во многих случаях из болта выходит только один поток, поэтому в Storm есть несколько удобных методов для использования имени потока по умолчанию.

Когда вы делаете .shuffleGrouping("checkingbolt"), вы используете один из этих удобные методы, фактически говоря: «Я хочу, чтобы этот болт использовал поток по умолчанию, выходящий из checkingbolt». Существует перегруженная версия этого метода, которую вы можете использовать, если хотите явно назвать поток, но это полезно, только если у вас есть несколько потоков, выходящих из одного болта.

Когда вы делаете ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));, вы говорят, что болт будет излучать поток под названием "cassandraBoltStream". Это, вероятно, не то, что вы хотите сделать, вы хотите объявить, что он будет излучать в потоке по умолчанию. Вы делаете это с помощью метода ofd.declare.

Подробнее см. В документации .

...