Вот код, который я пытаюсь выполнить. Я намеренно терплю неудачу в болте. Так что я вижу, что неудачно переданные сообщения воспроизводятся штормом. Но, похоже, этого не происходит.
public static class FastRandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;
private static final String[] CHOICES = {
"marry had a little lamb whos fleese was white as snow",
"and every where that marry went the lamb was sure to go"
};
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = ThreadLocalRandom.current();
}
@Override
public void nextTuple() {
String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
_collector.emit(new Values(sentence), sentence);
}
@Override
public void fail(Object id) {
System.out.println("RAVI: the failedObjectId = "+id);
_collector.emit(new Values(id), id);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
Вот подробности о Сплит-предложении Болт. Там, где я намеренно отказываю.
public static class SplitSentence extends BaseRichBolt
{
OutputCollector _collector;
@Override
public void prepare(Map conf,
TopologyContext context,
OutputCollector collector)
{
_collector = collector;
}
Это функция, где происходит сбой
@Override
public void execute(Tuple tuple)
{
String sentence = tuple.getString(0);
System.out.println("sentence = "+sentence);
if(sentence.equals("marry had a little lamb whos fleese was white as snow"))
{
System.out.println("going to fail");
_collector.fail(tuple);
}
else
{
for (String word: sentence.split("\\s+")) {
_collector.emit(tuple, new Values(word, 1));
}
_collector.ack(tuple);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
Это детали кода вождения. publi c stati c void main (String [] args) выдает исключение {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new FastRandomSentenceSpout(), 4);
builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");
Config conf = new Config();
conf.registerMetricsConsumer(
org.apache.storm.metric.LoggingMetricsConsumer.class);
String name = "wc-test";
if (args != null && args.length > 0) {
name = args[0];
}
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(name,
conf,
builder.createTopology());
}