Я создаю приложение Java Spring с использованием Storm 1.1.2 и Kafka 0.11 для запуска в контейнере Docker.
Все в моей топологии работает, как запланировано, но при высокой нагрузке от Kafka, отставание Kafkaувеличивается все больше и больше с течением времени.
Мой KafkaSpoutConfig:
KafkaSpoutConfig<String,String> spoutConf =
KafkaSpoutConfig.builder("kafkaContainerName:9092", "myTopic")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "myGroup")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyObjectDeserializer.class)
.build()
Тогда моя топология выглядит следующим образом
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("stormKafkaSpout", new KafkaSpout<String,String>(spoutConf), 25);
builder.setBolt("routerBolt", new RouterBolt(),25).shuffleGrouping("stormKafkaSpout");
Config conf = new Config();
conf.setNumWorkers(10);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of("zookeeper"));
conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
conf.put(Config.NIMBUS_SEEDS, ImmutableList.of("nimbus"));
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
System.setProperty("storm.jar", "/opt/storm.jar");
StormSubmitter.submitTopology("topologyId", conf, builder.createTopology());
RouterBolt (который расширяет BaseRichBolt) делает один оченьпростой оператор switch и затем использует локальный объект KafkaProducer для отправки нового сообщения в другую тему.Как я уже сказал, все компилируется и топология работает так, как ожидалось, но при высокой нагрузке (3000 сообщений / с) отставание Кафки просто накапливается, что эквивалентно низкой пропускной способности для топологии.
Я пытался отключить взлом с помощью
conf.setNumAckers(0);
и
conf.put(Config.TOPOLGY_ACKER_EXECUTORS, 0);
, но, думаю, это не проблема с взломом.
У меня естьв пользовательском интерфейсе Storm видно, что у RouterBolt задержка выполнения составляет 1,2 мс, а задержка обработки - 0,03 мс при высокой нагрузке, что заставляет меня поверить, что Spout является узким местом. Также подсказка о параллелизме равна 25, поскольку в myTopic имеется 25 разделов,Спасибо!