Как настроить правильный параллелизм в персисторном болте? - PullRequest
0 голосов
/ 25 апреля 2019

Я использую Apache Storm для создания топологии, которая первоначально читает «поток» кортежа в файле, а затем разделяет его и хранит кортежи в mongodb.

У меня есть кластер на Atlas с набором общих реплик.Я уже разработал топологию, и решение работает правильно, если я использую один поток.

    public static StormTopology build() {
        return buildWithSpout();
    }

    public static StormTopology buildWithSpout() {
        Config config = new Config();
        TopologyBuilder builder = new TopologyBuilder();

        CsvSpout datasetSpout = new CsvSpout("file.txt");
        SplitterBolt splitterBolt = new SplitterBolt(",");
        PartitionMongoInsertBolt insertPartitionBolt = new PartitionMongoInsertBolt();

        builder.setSpout(DATA_SPOUT_ID, datasetSpout, 1);
        builder.setBolt(DEPENDENCY_SPLITTER_ID, splitterBolt, 1).shuffleGrouping(DATA_SPOUT_ID);
        builder.setBolt(UPDATER_COUNTER_ID, insertPartitionBolt, 1).shuffleGrouping(DEPENDENCY_SPLITTER_ID);
    }

Однако, когда я использую параллельные процессы, мой персисторный болт не сохраняет все кортежи в mongodb, несмотря на то, что кортежи правильно выводятся предыдущим болтом.

        builder.setSpout(DATA_SPOUT_ID, datasetSpout, 1);
        builder.setBolt(DEPENDENCY_SPLITTER_ID, splitterBolt, 3).shuffleGrouping(DATA_SPOUT_ID);
        builder.setBolt(UPDATER_COUNTER_ID, insertPartitionBolt, 3).shuffleGrouping(DEPENDENCY_SPLITTER_ID);

Thisэто мой первый болт:

public class SplitterBolt extends BaseBasicBolt {
    private String del;
    private MongoConnector db = null;

    public SplitterBolt(String del) {
        this.del = del;
    }

    public void prepare(Map stormConf, TopologyContext context) {
        db = MongoConnector.getInstance();
    }

    public void execute(Tuple input, BasicOutputCollector collector) {
        String tuple = input.getStringByField("tuple");
        int idTuple = Integer.parseInt(input.getStringByField("id"));

        String opString = "";
        String[] data = tuple.split(this.del);
        for(int i=0; i < data.length; i++) {
            OpenBitSet attrs = new OpenBitSet();
            attrs.fastSet(i);
            opString = Utility.toStringOpenBitSet(attrs, 5);
            collector.emit(new Values(idTuple, opString, data[i]));
        }
        db.incrementCount();
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("idtuple","binaryattr","value"));
    }
}

А это мой болт-персистор, который хранит в монго все кортежи:

public class PartitionMongoInsertBolt extends BaseBasicBolt {
    private MongoConnector mongodb = null;

    public void prepare(Map stormConf, TopologyContext context) {
        //Singleton Instance
        mongodb = MongoConnector.getInstance();
    }

    public void execute(Tuple input, BasicOutputCollector collector) {
        mongodb.insertUpdateTuple(input);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

Мое единственное сомнение в том, что я использовал шаблон синглтона для класса соединенияМонгоМожет ли это быть проблемой?

ОБНОВЛЕНИЕ

Это мой класс MongoConnector:

public class MongoConnector {
    private MongoClient mongoClient = null;
    private MongoDatabase database = null;
    private MongoCollection<Document> partitionCollection = null;

    private static MongoConnector mongoInstance = null;

    public MongoConnector() {
        MongoClientURI uri = new MongoClientURI("connection string");
        this.mongoClient = new MongoClient(uri);
        this.database = mongoClient.getDatabase("db.database");
        this.partitionCollection = database.getCollection("db.collection");
    }

    public static MongoConnector getInstance() {
        if (mongoInstance == null)
            mongoInstance = new MongoConnector();
        return mongoInstance;
    }

    public void insertUpdateTuple2(Tuple tuple) {
        int idTuple = (Integer) tuple.getValue(0);
        String attrs = (String) tuple.getValue(1);
        String value = (String) tuple.getValue(2);
        value = value.replace('.', ',');

        Bson query = Filters.eq("_id", attrs);
        Document docIterator = this.partitionCollection.find(query).first();

        if (docIterator != null) { 
            Bson newValue = new Document(value, idTuple);
            Bson updateDocument = new Document("$push", newValue);
            this.partitionCollection.updateOne(docIterator, updateDocument);
        } else { 
            Document document = new Document();
            document.put("_id", attrs);
            ArrayList<Integer> partition = new ArrayList<Integer>();
            partition.add(idTuple);
            document.put(value, partition);
            this.partitionCollection.insertOne(document);
        }
    }
}   

ОБНОВЛЕНИЕ РЕШЕНИЯ

Я решил проблему с этой строкой:

this.partitionCollection.updateOne(docIterator, updateDocument);

in

this.partitionCollection.findOneAndUpdate(query, updateDocument);
...