Как перенастроить MongoClient во время выполнения, чтобы переключить подключение к реплике на подключение к локальному хосту и наоборот? - PullRequest
0 голосов
/ 19 февраля 2020

My mon go развертывание состоит из одного основного сервера, двух вторичных серверов и нескольких арбитров. Когда я запускаю мое приложение весенней загрузки, я хочу реализовать следующую логику c: если к нему подключен доступный набор реплик, в противном случае подключитесь к серверу localhost mongod и запустите отдельный поток, который проверит, доступен ли набор реплик. Когда набор реплик станет доступен, отключитесь от localhost и подключитесь к набору реплик. У каждой машины, на которой запущено мое приложение, есть свой работающий mongod, который является либо основным сервером, либо вторичным, либо арбитром. как мне это сделать? Я попытался создать свой собственный MongoDbFactory, например:

public class SwithchingMongoDbFactory implements DisposableBean, MongoDbFactory {

public static volatile boolean replicaStatusOk = true;
public static Collection<ReplicaStatusListener> allListeners = new ArrayList<ReplicaStatusListener>(){
    {
        add(new DefaultReplicaStatusListener());
    }
};

private String databaseName;
private boolean mongoInstanceCreated;
private PersistenceExceptionTranslator exceptionTranslator;
private MongoClientURI replicaUri;
private MongoClientURI localUri;

private final ScheduledExecutorService chekingMasterThread = Executors.newSingleThreadScheduledExecutor(
        r -> new Thread(r, "Поток проверки подключения к ведущему серверу реплиционного множества")
);
private static final long CHECKING_MASTER_PERIOD = 2L;//sec


public SwithchingMongoDbFactory(MongoClientURI uri){
    replicaUri = uri;
    databaseName = uri.getDatabase();
    init(true);
}

public SwithchingMongoDbFactory(MongoClient mongoClient, String databaseName){
    init(mongoClient, databaseName, false);
}

private void init(boolean mongoInstanceCreated){
    Assert.hasText(databaseName, "Database name must not be empty!");
    Assert.isTrue(databaseName.matches("[\\w-]+"),
            "Database name must only contain letters, numbers, underscores and dashes!");

    this.replicaMongoClient = replicaMongoClient;
    final Pattern pattern = Pattern.compile("^mongodb://(.+):(.+)@(.+)");
    final Matcher matcher = pattern.matcher(replicaUri.getURI());
    final boolean found = matcher.find();
    if(found){
        final String user = matcher.group(1);
        final String password = matcher.group(2);
        final String localUriStr = "mongodb://" + user + ":" + password + "@127.0.0.1:27017/" + databaseName;
        localUri = new MongoClientURI(localUriStr);
    } else{
        throw new UnknownFormatConversionException("Ошибка при парсенге строки подключения к MongoDB");
    }
    CheckingMongoReplicaSet checkingMongoReplicaSet = new CheckingMongoReplicaSet();
    final ServerAddress master = replicaMongoClient.getReplicaSetStatus().getMaster();
    if(master == null){ //Не удалось связаться с класетром
        closeReplicaClient();
        replicaStatusOk = false;
        allListeners.forEach(ReplicaStatusListener::onReplicaFailed);
        localHostMongoClient = new MongoClient(localUri);
        chekingMasterThread.scheduleAtFixedRate(
                checkingMongoReplicaSet, 0,
                CHECKING_MASTER_PERIOD, TimeUnit.SECONDS
        );
    }
    this.databaseName = databaseName;
    this.mongoInstanceCreated = mongoInstanceCreated;
    this.exceptionTranslator = new MongoExceptionTranslator();
}

private void closeReplicaClient(){
    replicaMongoClient.close();
    replicaMongoClient = null;
}

private void closeLocalHostClient(){
    localHostMongoClient.close();
    localHostMongoClient = null;
}

@Override
public MongoDatabase getDb() throws DataAccessException {
    return getDb(databaseName);
}

@Override
public MongoDatabase getDb(String dbName) throws DataAccessException {
    Assert.hasText(dbName, "Database name must not be empty.");

    if(replicaStatusOk){
        return replicaMongoClient.getDatabase(databaseName);
    } else {
        return localHostMongoClient.getDatabase(databaseName);
    }
}

@Override
public PersistenceExceptionTranslator getExceptionTranslator() {
    return exceptionTranslator;
}

@Override
public DB getLegacyDb() {
    if(replicaStatusOk){
        return replicaMongoClient.getDB(databaseName);
    } else {
        return localHostMongoClient.getDB(databaseName);
    }
}

@Override
public void destroy() throws Exception {
    if (mongoInstanceCreated) {
        replicaMongoClient.close();
    }
    localHostMongoClient.close();
}

private class CheckingMongoReplicaSet implements Runnable {

    private boolean needCheck = true;

    @Override
    public void run() {
        if(!needCheck || Thread.currentThread().isInterrupted()) return;

        MongoClient tempClient = new MongoClient(SwithchingMongoDbFactory.this.replicaUri);
        final ReplicaSetStatus replicaSetStatus = tempClient.getReplicaSetStatus();
        final ServerAddress master = replicaSetStatus.getMaster();
        if (master != null) {
            SwithchingMongoDbFactory.this.closeLocalHostClient();
            SwithchingMongoDbFactory.this.replicaMongoClient = tempClient;
            replicaStatusOk = true;
            allListeners.forEach(ReplicaStatusListener::onReplicaRecover);
            needCheck = false;
            Thread.currentThread().interrupt();
        } else {
            tempClient.close();
        }
    }
}

public interface ReplicaStatusListener{
    void onReplicaFailed();
    void onReplicaRecover();
}

public static class DefaultReplicaStatusListener implements ReplicaStatusListener{

    private static Logger logger = LogManager.getLogger(DefaultReplicaStatusListener.class);
    @Override
    public void onReplicaFailed() {
        logger.warn("Ошибка подключения к реплиционному кластеру");
    }

    @Override
    public void onReplicaRecover() {
        logger.warn("Подключение к реплиционному кластеру восстановлено");
    }
}

}

И использовать мой завод в приложении с весенней загрузкой:

@Configuration
@PropertySource("file:./conf/mongodb.properties")
@Component
public class MongoConfiguration {
    @Value("${mongo.uri}")
    private String uri;
    public final static String dbName = "my-super-db";

@Bean
    public MongoDbFactory mongoDbFactory(){
        final MongoClientURI mongoClientURI = new MongoClientURI(uri);
        return new SwithchingMongoDbFactory(mongoClientURI);
    }

    @Bean
    public MongoTemplate mongoTemplate(){
        return new MongoTemplate(mongoDbFactory());
    }
}

Проблема в том, что если основной сервер недоступен, mon go драйвер пытается подключиться к нему бесконечно. Поэтому мой chekingMasterThread блокирует приложение. Когда я устанавливаю readPreference = pimaryPreferred в URI моего подключения и основной сервер недоступен, мое приложение запускается успешно, но одна операция чтения занимает около 30 секунд, что недопустимо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...