Я использую Apache Storm для создания топологии, которая первоначально читает «поток» кортежа в файле, а затем разделяет его и хранит кортежи в mongodb.
У меня есть кластер на Atlas с набором общих реплик.Я уже разработал топологию, и решение работает правильно, если я использую один поток.
public static StormTopology build() {
return buildWithSpout();
}
public static StormTopology buildWithSpout() {
Config config = new Config();
TopologyBuilder builder = new TopologyBuilder();
CsvSpout datasetSpout = new CsvSpout("file.txt");
SplitterBolt splitterBolt = new SplitterBolt(",");
PartitionMongoInsertBolt insertPartitionBolt = new PartitionMongoInsertBolt();
builder.setSpout(DATA_SPOUT_ID, datasetSpout, 1);
builder.setBolt(DEPENDENCY_SPLITTER_ID, splitterBolt, 1).shuffleGrouping(DATA_SPOUT_ID);
builder.setBolt(UPDATER_COUNTER_ID, insertPartitionBolt, 1).shuffleGrouping(DEPENDENCY_SPLITTER_ID);
}
Однако, когда я использую параллельные процессы, мой персисторный болт не сохраняет все кортежи в mongodb, несмотря на то, что кортежи правильно выводятся предыдущим болтом.
builder.setSpout(DATA_SPOUT_ID, datasetSpout, 1);
builder.setBolt(DEPENDENCY_SPLITTER_ID, splitterBolt, 3).shuffleGrouping(DATA_SPOUT_ID);
builder.setBolt(UPDATER_COUNTER_ID, insertPartitionBolt, 3).shuffleGrouping(DEPENDENCY_SPLITTER_ID);
Thisэто мой первый болт:
public class SplitterBolt extends BaseBasicBolt {
private String del;
private MongoConnector db = null;
public SplitterBolt(String del) {
this.del = del;
}
public void prepare(Map stormConf, TopologyContext context) {
db = MongoConnector.getInstance();
}
public void execute(Tuple input, BasicOutputCollector collector) {
String tuple = input.getStringByField("tuple");
int idTuple = Integer.parseInt(input.getStringByField("id"));
String opString = "";
String[] data = tuple.split(this.del);
for(int i=0; i < data.length; i++) {
OpenBitSet attrs = new OpenBitSet();
attrs.fastSet(i);
opString = Utility.toStringOpenBitSet(attrs, 5);
collector.emit(new Values(idTuple, opString, data[i]));
}
db.incrementCount();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("idtuple","binaryattr","value"));
}
}
А это мой болт-персистор, который хранит в монго все кортежи:
public class PartitionMongoInsertBolt extends BaseBasicBolt {
private MongoConnector mongodb = null;
public void prepare(Map stormConf, TopologyContext context) {
//Singleton Instance
mongodb = MongoConnector.getInstance();
}
public void execute(Tuple input, BasicOutputCollector collector) {
mongodb.insertUpdateTuple(input);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}
Мое единственное сомнение в том, что я использовал шаблон синглтона для класса соединенияМонгоМожет ли это быть проблемой?
ОБНОВЛЕНИЕ
Это мой класс MongoConnector:
public class MongoConnector {
private MongoClient mongoClient = null;
private MongoDatabase database = null;
private MongoCollection<Document> partitionCollection = null;
private static MongoConnector mongoInstance = null;
public MongoConnector() {
MongoClientURI uri = new MongoClientURI("connection string");
this.mongoClient = new MongoClient(uri);
this.database = mongoClient.getDatabase("db.database");
this.partitionCollection = database.getCollection("db.collection");
}
public static MongoConnector getInstance() {
if (mongoInstance == null)
mongoInstance = new MongoConnector();
return mongoInstance;
}
public void insertUpdateTuple2(Tuple tuple) {
int idTuple = (Integer) tuple.getValue(0);
String attrs = (String) tuple.getValue(1);
String value = (String) tuple.getValue(2);
value = value.replace('.', ',');
Bson query = Filters.eq("_id", attrs);
Document docIterator = this.partitionCollection.find(query).first();
if (docIterator != null) {
Bson newValue = new Document(value, idTuple);
Bson updateDocument = new Document("$push", newValue);
this.partitionCollection.updateOne(docIterator, updateDocument);
} else {
Document document = new Document();
document.put("_id", attrs);
ArrayList<Integer> partition = new ArrayList<Integer>();
partition.add(idTuple);
document.put(value, partition);
this.partitionCollection.insertOne(document);
}
}
}
ОБНОВЛЕНИЕ РЕШЕНИЯ
Я решил проблему с этой строкой:
this.partitionCollection.updateOne(docIterator, updateDocument);
in
this.partitionCollection.findOneAndUpdate(query, updateDocument);