Почему Storm не воспроизводит сообщение о сбое в кластере при работе, а воспроизводит в режиме кластера на локальном рабочем столе - PullRequest
0 голосов
/ 21 апреля 2020

Вот код, который я пытаюсь выполнить. Я намеренно терплю неудачу в болте. Так что я вижу, что неудачно переданные сообщения воспроизводятся штормом. Но, похоже, этого не происходит.

public static class FastRandomSentenceSpout extends BaseRichSpout {
  SpoutOutputCollector _collector;
  Random _rand;
   private static final String[] CHOICES = {
       "marry had a little lamb whos fleese was white as snow",
       "and every where that marry went the lamb was sure to go"
   };

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      _collector = collector;
      _rand = ThreadLocalRandom.current();
    }

   @Override
   public void nextTuple() {
      String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
      _collector.emit(new Values(sentence), sentence);
   }

   @Override
   public void fail(Object id) {
      System.out.println("RAVI: the failedObjectId = "+id);
      _collector.emit(new Values(id), id);
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("sentence"));
   }
 }

Вот подробности о Сплит-предложении Болт. Там, где я намеренно отказываю.

 public static class SplitSentence extends BaseRichBolt 
 {
     OutputCollector _collector;
     @Override
     public void prepare(Map conf,
                     TopologyContext context,
                     OutputCollector collector)
    {
       _collector = collector;
    }

Это функция, где происходит сбой

    @Override
    public void execute(Tuple tuple) 
    {
        String sentence = tuple.getString(0);
        System.out.println("sentence = "+sentence);
        if(sentence.equals("marry had a little lamb whos fleese was white as snow"))
        {
           System.out.println("going to fail");
           _collector.fail(tuple);
        }
        else
        { 
           for (String word: sentence.split("\\s+")) {
              _collector.emit(tuple, new Values(word, 1));
           }
           _collector.ack(tuple);
        }   
     }

     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("word", "count"));
     }
  }

Это детали кода вождения. publi c stati c void main (String [] args) выдает исключение {

   TopologyBuilder builder = new TopologyBuilder();

   builder.setSpout("spout", new FastRandomSentenceSpout(), 4);

   builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");


   Config conf = new Config();
   conf.registerMetricsConsumer(
             org.apache.storm.metric.LoggingMetricsConsumer.class);


   String name = "wc-test";
   if (args != null && args.length > 0) {
       name = args[0];
   }

   conf.setNumWorkers(1);
   StormSubmitter.submitTopologyWithProgressBar(name, 
                                                conf,
                                                builder.createTopology());

  }

1 Ответ

0 голосов
/ 22 апреля 2020

оказывается, это было связано с глобальными настройками, упомянутыми в storm.yaml. Значение c указано

topology.acker.executors: 0
...