Я реализовал потоки изменений MongoDB с помощью Spring, и он отлично работает, когда основной узел набора реплик работает.
@Service
public class ChangeEventService {
private static final Logger logger = LoggerFactory.getLogger(ChangeEventService.class);
private final MongoClient mongoClient;
public ChangeEventService(MongoClient mongoClient) {
this.mongoClient = mongoClient;
}
@PostConstruct
public void subscribe() {
MongoDatabase db = mongoClient.getDatabase("experiment");
MongoCollection<Document> collection = db.getCollection("debug");
Block<ChangeStreamDocument<Document>> printBlock = changeStreamDocument -> {
logger.info("Received: {}", changeStreamDocument.getFullDocument().toString());
BsonDocument resumeToken = changeStreamDocument.getResumeToken();
};
collection.watch().forEach(printBlock);
logger.info("Consumer is ready to process");
}
}
Затем я закрыл основной узел набора реплик. Я ожидал, что поток изменений подождет, пока набор реплик выберет новый первичный сервер и продолжит получать изменения данных. Фактическое поведение - сбой приложения.
Из журналов я вижу, что соединение с первичным (27000) закрыто, что ожидается, затем он пытается открыть соединение с одним из вторичных (27001), но не может, потому что пул был закрыты.
Из документации: «Поток изменений привязан к коллекции, а документы потока изменений повторяются с помощью курсора. Этот курсор остается открытым до тех пор, пока он не будет явно закрыт, пока соединение с развертыванием MongoDB остается открытым и коллекция существует. "
2018-05-02 12:03:03.424 INFO 9560 --- [ main] c.e.m.service.ChangeEventService : Received: Document{{_id=5ae98cd7dcc8921c94d5f9e5, _class=com.mongodb.BasicDBObject, uuid=4f836d00-efc3-4d48-956a-af4dbfed90e7, now=Wed May 02 12:03:03 CEST 2018}}
2018-05-02 12:03:06.500 WARN 9560 --- [ main] org.mongodb.driver.connection : Got socket exception on connection [connectionId{localValue:4, serverValue:8}] to localhost:27000. All connections to localhost:27000 will be closed.
2018-05-02 12:03:06.501 INFO 9560 --- [ main] org.mongodb.driver.connection : Closed connection [connectionId{localValue:4, serverValue:8}] to localhost:27000 because there was a socket exception raised by this connection.
2018-05-02 12:03:07.502 INFO 9560 --- [ main] org.mongodb.driver.connection : Closed connection [connectionId{localValue:6}] to localhost:27000 because there was a socket exception raised by this connection.
2018-05-02 12:03:07.504 WARN 9560 --- [ main] ationConfigEmbeddedWebApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'changeEventService': Invocation of init method failed; nested exception is com.mongodb.MongoSocketOpenException: Exception opening socket
2018-05-02 12:03:07.505 INFO 9560 --- [localhost:27000] org.mongodb.driver.cluster : Exception in monitor thread while connecting to server localhost:27000
com.mongodb.MongoSocketOpenException: Exception opening socket
at com.mongodb.connection.SocketStream.open(SocketStream.java:62) ~[mongodb-driver-core-3.6.3.jar:na]
at com.mongodb.connection.InternalStreamConnection.open(InternalStreamConnection.java:126) ~[mongodb-driver-core-3.6.3.jar:na]
at com.mongodb.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:128) ~[mongodb-driver-core-3.6.3.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
Caused by: java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ~[na:1.8.0_161]
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) ~[na:1.8.0_161]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_161]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_161]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_161]
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) ~[na:1.8.0_161]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_161]
at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_161]
at com.mongodb.connection.SocketStreamHelper.initialize(SocketStreamHelper.java:59) ~[mongodb-driver-core-3.6.3.jar:na]
at com.mongodb.connection.SocketStream.open(SocketStream.java:57) ~[mongodb-driver-core-3.6.3.jar:na]
... 3 common frames omitted
2018-05-02 12:03:07.507 INFO 9560 --- [ main] org.mongodb.driver.connection : Opened connection [connectionId{localValue:7, serverValue:181}] to localhost:27001
2018-05-02 12:03:07.508 INFO 9560 --- [ main] org.mongodb.driver.connection : Closed connection [connectionId{localValue:7, serverValue:181}] to localhost:27001 because the pool has been closed.
2018-05-02 12:03:07.511 INFO 9560 --- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
2