Клиент Vertx Mongo подавляющим Mongodb с одновременными обновлениями - PullRequest
0 голосов
/ 19 января 2019

Попытка построить реактивную систему с использованием akka, rx, vert.x and mongodb, где поток (нисходящий поток) выглядит в значительной степени:

publisher -> akka streams -> rx streams -> vert.x event bus -> vert.x mongo rx client -> mongo client

Я столкнулся с ситуацией, когда апстрим вызывает слишком много действий по сбору обновлений на mongo, и это заканчивается:

com.mongodb.MongoWaitQueueFullException: Too many threads are already waiting for a connection. Max number of threads (maxWaitQueueSize) of 500 has been exceeded.

updateCollection:

  • запускается одновременно примерно на 10 publishers каждый толкающий элемент каждые 0.1 second

  • исполняется в той же коллекции

  • выполняет добавление нового элемента в массив, встроенный в документ

1) Поскольку это реактивная система, было бы неплохо создать обратное давление на источник, выполнить запрос http только так часто, как это позволяет писать в монго - так, чтобы очередь потоков, ожидающих соединения с монго, не будет расти,

Есть ли какой-нибудь шаблон / пример для такого противодавления с монго, которому нужно следовать, или я должен изобрести и реализовать его самостоятельно?

Как я могу получить доступ и наблюдать количество потоков, ожидающих соединения через клиент Vertx Mongo?

2) При просмотре кода клиента vertx mongo выясняется, что он не поддерживает соединение с mongo открытым, и новый сеанс открывается для каждого действия обновления. Вы можете наблюдать это в io.vertx.ext.mongo.impl.MongoClientImpl.updateCollection(...), где он вызывает MongoCollection.updateOne(...) без передачи параметра ClientSession clientSession.

Хотя 10 одновременных обновлений в секунду кажутся небольшим числом, вопрос заключается в том, может ли быть так, что создание ClientSession занимает много времени и поэтому приводит к тому, что потоки ставятся в очередь?

Кроме того, каковы будут дизайнерские решения, чтобы не кэшировать соединение с mongo в клиенте vertx mongo?

mongostat:

insert query update delete getmore command dirty  used flushes vsize   res qrw  arw net_in net_out conn                time
    *0    *0     32     *0       0   139|0  9.8% 10.8%       0 1.84G  666M 0|0 1|89  29.6k   62.9k  104 Jan 19 02:33:51.980
    *0    *0      6     *0       0     4|0 18.7% 18.7%       0 2.90G 1.50G 0|0 1|100  2.41k   9.59k  104 Jan 19 02:33:59.342
    *0    *0     *0     *0       0     2|0 15.7% 17.2%       0 3.52G 1.60G 0|0  1|97   493b   7.90k  104 Jan 19 02:34:07.480
    *0    *0      9     *0       0     3|0 14.7% 17.2%       0 3.52G 1.57G 0|0 1|100  3.10k   18.7k  104 Jan 19 02:34:10.955
    *0    *0      1     *0       0     1|0 21.4% 23.1%       0 3.52G 1.57G 0|0 1|100   749b   7.46k  104 Jan 19 02:34:19.579
    *0    *0     10     *0       0    16|0 36.7% 37.4%       0 3.57G 1.57G 0|0 1|100  4.79k   73.7k  104 Jan 19 02:34:20.443
    *0    *0     *0     *0       0     9|0 53.6% 54.0%       0 3.62G 1.56G 0|0 1|100  1.33k   47.6k  104 Jan 19 02:34:21.769
    *0    *0      1     *0       0    13|0 54.5% 55.2%       0 3.62G 1.57G 0|0 1|100  1.92k   70.6k  104 Jan 19 02:34:22.659
    *0    *0     *0     *0       0    23|0 70.5% 70.9%       0 3.62G 1.56G 0|0 1|100  2.75k    122k  104 Jan 19 02:34:23.173
    *0    *0     *0     *0       0    31|0 72.1% 72.5%       0 3.62G 1.58G 0|0 1|100  3.56k    153k  104 Jan 19 02:34:23.586

Буду очень признателен за вашу помощь.

1 Ответ

0 голосов
/ 21 января 2019

Я не думаю, что вам нужно обратное резервирование, потому что если вы получаете больше, чем обрабатываете все время, это выделит всю память и исключение курса в вашем случае, но я думаю, что выбор заключается в создании источника излучения, где каждый поток посылает элемент этому эмиттер и использовать mongorepo.saveAll (эмиттер):

процессор1 ->

процессор2 ->

процессор3 ->

процессор4 ->

processor5 -> EMITTING EMITTER - EMITTER СОХРАНИТЬ В MONGO с помощью saveAll (emmiter)

процессор6 ->

процессор7 ->

процессор8 ->

процессор9 ->

процессор10 ->

...