My mon go развертывание состоит из одного основного сервера, двух вторичных серверов и нескольких арбитров. Когда я запускаю мое приложение весенней загрузки, я хочу реализовать следующую логику c: если к нему подключен доступный набор реплик, в противном случае подключитесь к серверу localhost mongod и запустите отдельный поток, который проверит, доступен ли набор реплик. Когда набор реплик станет доступен, отключитесь от localhost и подключитесь к набору реплик. У каждой машины, на которой запущено мое приложение, есть свой работающий mongod, который является либо основным сервером, либо вторичным, либо арбитром. как мне это сделать? Я попытался создать свой собственный MongoDbFactory, например:
public class SwithchingMongoDbFactory implements DisposableBean, MongoDbFactory {
public static volatile boolean replicaStatusOk = true;
public static Collection<ReplicaStatusListener> allListeners = new ArrayList<ReplicaStatusListener>(){
{
add(new DefaultReplicaStatusListener());
}
};
private String databaseName;
private boolean mongoInstanceCreated;
private PersistenceExceptionTranslator exceptionTranslator;
private MongoClientURI replicaUri;
private MongoClientURI localUri;
private final ScheduledExecutorService chekingMasterThread = Executors.newSingleThreadScheduledExecutor(
r -> new Thread(r, "Поток проверки подключения к ведущему серверу реплиционного множества")
);
private static final long CHECKING_MASTER_PERIOD = 2L;//sec
public SwithchingMongoDbFactory(MongoClientURI uri){
replicaUri = uri;
databaseName = uri.getDatabase();
init(true);
}
public SwithchingMongoDbFactory(MongoClient mongoClient, String databaseName){
init(mongoClient, databaseName, false);
}
private void init(boolean mongoInstanceCreated){
Assert.hasText(databaseName, "Database name must not be empty!");
Assert.isTrue(databaseName.matches("[\\w-]+"),
"Database name must only contain letters, numbers, underscores and dashes!");
this.replicaMongoClient = replicaMongoClient;
final Pattern pattern = Pattern.compile("^mongodb://(.+):(.+)@(.+)");
final Matcher matcher = pattern.matcher(replicaUri.getURI());
final boolean found = matcher.find();
if(found){
final String user = matcher.group(1);
final String password = matcher.group(2);
final String localUriStr = "mongodb://" + user + ":" + password + "@127.0.0.1:27017/" + databaseName;
localUri = new MongoClientURI(localUriStr);
} else{
throw new UnknownFormatConversionException("Ошибка при парсенге строки подключения к MongoDB");
}
CheckingMongoReplicaSet checkingMongoReplicaSet = new CheckingMongoReplicaSet();
final ServerAddress master = replicaMongoClient.getReplicaSetStatus().getMaster();
if(master == null){ //Не удалось связаться с класетром
closeReplicaClient();
replicaStatusOk = false;
allListeners.forEach(ReplicaStatusListener::onReplicaFailed);
localHostMongoClient = new MongoClient(localUri);
chekingMasterThread.scheduleAtFixedRate(
checkingMongoReplicaSet, 0,
CHECKING_MASTER_PERIOD, TimeUnit.SECONDS
);
}
this.databaseName = databaseName;
this.mongoInstanceCreated = mongoInstanceCreated;
this.exceptionTranslator = new MongoExceptionTranslator();
}
private void closeReplicaClient(){
replicaMongoClient.close();
replicaMongoClient = null;
}
private void closeLocalHostClient(){
localHostMongoClient.close();
localHostMongoClient = null;
}
@Override
public MongoDatabase getDb() throws DataAccessException {
return getDb(databaseName);
}
@Override
public MongoDatabase getDb(String dbName) throws DataAccessException {
Assert.hasText(dbName, "Database name must not be empty.");
if(replicaStatusOk){
return replicaMongoClient.getDatabase(databaseName);
} else {
return localHostMongoClient.getDatabase(databaseName);
}
}
@Override
public PersistenceExceptionTranslator getExceptionTranslator() {
return exceptionTranslator;
}
@Override
public DB getLegacyDb() {
if(replicaStatusOk){
return replicaMongoClient.getDB(databaseName);
} else {
return localHostMongoClient.getDB(databaseName);
}
}
@Override
public void destroy() throws Exception {
if (mongoInstanceCreated) {
replicaMongoClient.close();
}
localHostMongoClient.close();
}
private class CheckingMongoReplicaSet implements Runnable {
private boolean needCheck = true;
@Override
public void run() {
if(!needCheck || Thread.currentThread().isInterrupted()) return;
MongoClient tempClient = new MongoClient(SwithchingMongoDbFactory.this.replicaUri);
final ReplicaSetStatus replicaSetStatus = tempClient.getReplicaSetStatus();
final ServerAddress master = replicaSetStatus.getMaster();
if (master != null) {
SwithchingMongoDbFactory.this.closeLocalHostClient();
SwithchingMongoDbFactory.this.replicaMongoClient = tempClient;
replicaStatusOk = true;
allListeners.forEach(ReplicaStatusListener::onReplicaRecover);
needCheck = false;
Thread.currentThread().interrupt();
} else {
tempClient.close();
}
}
}
public interface ReplicaStatusListener{
void onReplicaFailed();
void onReplicaRecover();
}
public static class DefaultReplicaStatusListener implements ReplicaStatusListener{
private static Logger logger = LogManager.getLogger(DefaultReplicaStatusListener.class);
@Override
public void onReplicaFailed() {
logger.warn("Ошибка подключения к реплиционному кластеру");
}
@Override
public void onReplicaRecover() {
logger.warn("Подключение к реплиционному кластеру восстановлено");
}
}
}
И использовать мой завод в приложении с весенней загрузкой:
@Configuration
@PropertySource("file:./conf/mongodb.properties")
@Component
public class MongoConfiguration {
@Value("${mongo.uri}")
private String uri;
public final static String dbName = "my-super-db";
@Bean
public MongoDbFactory mongoDbFactory(){
final MongoClientURI mongoClientURI = new MongoClientURI(uri);
return new SwithchingMongoDbFactory(mongoClientURI);
}
@Bean
public MongoTemplate mongoTemplate(){
return new MongoTemplate(mongoDbFactory());
}
}
Проблема в том, что если основной сервер недоступен, mon go драйвер пытается подключиться к нему бесконечно. Поэтому мой chekingMasterThread блокирует приложение. Когда я устанавливаю readPreference = pimaryPreferred в URI моего подключения и основной сервер недоступен, мое приложение запускается успешно, но одна операция чтения занимает около 30 секунд, что недопустимо.