Я пытаюсь прочитать данные из Кафки и вставить в Кассандру, используя шторм. Я также настроил топологию, но у меня возникли некоторые проблемы, и я понятия не имею, почему это происходит.
Вот мой отправитель.
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", new KafkaSpout(spoutConfig));
topologyBuilder.setBolt("checkingbolt", new CheckingBolt("cassandraBoltStream")).shuffleGrouping("spout");
topologyBuilder.setBolt("cassandrabolt", new CassandraInsertBolt()).shuffleGrouping("checkingbolt");
Здесь, если я прокомментирую последнюю строку, я не вижу никаких исключений. В последней строке я получаю следующее сообщение об ошибке:
InvalidTopologyException(msg:Component: [cassandrabolt] subscribes from non-existent stream: [default] of component [checkingbolt])
Может кто-нибудь, пожалуйста, помогите мне, что здесь не так?
Вот выход outputFieldDeclarer в CheckingBolt
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));
}
У меня нет ничего в методе DeclareOutputFields для CassandraInsertBolt, так как этот болт не излучает никаких значений.
TIA