Интеграция Storm-kafka-mongoDB - PullRequest
       25

Интеграция Storm-kafka-mongoDB

0 голосов
/ 06 марта 2019

Я читаю 500 МБ случайных кортежей от производителя Kafka непрерывно, и в топологии шторма я вставляю его в MongoDb с помощью Mongo Java Driver.Проблема в том, что я получаю действительно низкую пропускную способность - 4-5 кортежей в секунду.

Без вставки в БД, если я пишу простое утверждение печати, я получаю пропускную способность 684 кортежа в секунду.Я планирую запустить 1 миллион записей из Kafka и проверить пропускную способность с помощью монго-вставки.

Я попытался настроить с помощью config setMaxSpoutPending, setMessageTimeoutSecs parms в kafkaconfig.

   final SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot, clientId);
    kafkaConf.ignoreZkOffsets=false;
    kafkaConf.useStartOffsetTimeIfOffsetOutOfRange=true;
    kafkaConf.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
    kafkaConf.stateUpdateIntervalMs=2000;
    kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    final TopologyBuilder topologyBuilder = new TopologyBuilder();
    topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
    topologyBuilder.setBolt("print-messages", new MyKafkaBolt()).shuffleGrouping("kafka-spout");
     Config conf = new Config();
     conf.setDebug(true);
     conf.setMaxSpoutPending(1000);
     conf.setMessageTimeoutSecs(30);

Выполнить метод болта

      JSONObject jObj = new JSONObject();
    jObj.put("key", input.getString(0));

        if (null !=jObj && jObj.size() > 0 ) {
            final DBCollection quoteCollection = dbConnect.getConnection().getCollection("stormPoc");
            if (quoteCollection != null) {
                BasicDBObject dbObject = new BasicDBObject();
                dbObject.putAll(jObj);
                quoteCollection.insert(dbObject);
            //  logger.info("inserted in Collection !!!");
            } else {
                logger.info("Error while inserting data in DB!!!");
            }
            collector.ack(input);

1 Ответ

0 голосов
/ 06 марта 2019

Существует модуль storm-mongodb для интеграции с Mongo.Разве это не делает работу?https://github.com/apache/storm/tree/b07413670fa62fec077c92cb78fc711c3bda820c/external/storm-mongodb

Вы не должны использовать storm-kafka для интеграции Kafka, это устарело.Вместо этого используйте storm-kafka-client.

Настройка conf.setDebug(true) повлияет на вашу обработку, так как Storm будет регистрировать довольно большое количество текста на кортеж.

...