Как выслушать изменения в коллекции MongoDB? - PullRequest
180 голосов
/ 14 марта 2012

Я создаю своего рода фоновую систему очередей заданий с MongoDB в качестве хранилища данных. Как я могу «прослушать» вставки в коллекцию MongoDB, прежде чем создавать рабочих для обработки работы? Нужно ли опрашивать каждые несколько секунд, чтобы увидеть, есть ли какие-либо изменения с прошлого раза, или есть ли способ, которым мой сценарий может ожидать вставки? Это PHP-проект, над которым я работаю, но не стесняйтесь отвечать на Ruby или не зависимо от языка.

Ответы [ 10 ]

108 голосов
/ 14 марта 2012

То, о чем вы думаете, звучит очень похоже на триггеры.MongoDB не имеет никакой поддержки триггеров, однако некоторые люди «свернули свои собственные», используя некоторые приемы.Ключевым моментом здесь является оплог.

Когда вы запускаете MongoDB в наборе реплик, все действия MongoDB записываются в журнал операций (известный как оплог).Оплог - это просто список изменений, внесенных в данные.Реплики Устанавливает функцию, прослушивая изменения в этом журнале операций и затем применяя изменения локально.

Это звучит знакомо?

Я не могу описать весь процесс здесь, это несколько страниц документации, но инструменты, которые вам нужны, доступны.

Первые несколько записей в оплоге - Краткое описание - Макет local коллекции (которая содержит оплог)

Вы также захотитекредитное плечо курсоры с хвостовой опорой .Это даст вам возможность прослушивать изменения, а не опрашивать их.Обратите внимание, что для репликации используются настраиваемые курсоры, поэтому это поддерживаемая функция.

97 голосов
/ 26 апреля 2012

MongoDB имеет то, что называется capped collections и tailable cursors, что позволяет MongoDB передавать данные слушателям.

A capped collection по сути являетсяколлекция, которая имеет фиксированный размер и допускает только вставки.Вот как это будет выглядеть:

db.createCollection("messages", { capped: true, size: 100000000 })

MongoDB Tailable курсоры ( оригинальный пост Джонатана Х. Заработная плата )

Ruby

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Python ( Robert Stewart)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl ( Макс. )

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

Дополнительные ресурсы:

Ruby / Node.js Учебное пособие, которое проведет вас черезсоздание приложения, которое прослушивает вставки в закрытой коллекции MongoDB.

Статья, в которой более подробно рассматривается настраиваемые курсоры.

PHP, Ruby, Python и Perl примеры использования настраиваемых курсоров.

37 голосов
/ 07 сентября 2017

Начиная с MongoDB 3.6 появится новый API уведомлений под названием Change Streams, который вы можете использовать для этого.См. этот пост в блоге для примера .Пример из этого:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])
29 голосов
/ 07 февраля 2018

Проверьте это: Изменить потоки

10 января 2018 г. - Выпуск 3,6

* РЕДАКТИРОВАТЬ: я написал статью о том, как это сделать https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


Это новое в mongodb 3.6 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2

Для использования changeStreams база данных должна быть Набор репликации

Подробнее о наборах репликации: https://docs.mongodb.com/manual/replication/

По умолчанию ваша база данных будет « Standalone ».

Как преобразовать автономный набор реплик: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/


Следующий пример представляет собой практическое применение того, как вы можете использовать это. * Специально для Node.

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */

Полезные ссылки:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams

18 голосов
/ 17 декабря 2017

MongoDB версии 3.6 теперь включает потоки изменений, которые, по сути, представляют собой API поверх OpLog, что позволяет использовать сценарии, подобные триггерам / уведомлениям.

Вот ссылка на пример Java: http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/

Пример NodeJS может выглядеть примерно так:

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client){
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change){
            console.log(JSON.stringify(change));
          });
      });
3 голосов
/ 03 августа 2015

В качестве альтернативы, вы можете использовать стандартный метод Mongo FindAndUpdate и в обратном вызове запускать событие EventEmitter (в узле) при выполнении обратного вызова.

Любые другие части приложения или архитектуры, прослушивающие это событие, будут уведомлены об обновлении, и любые соответствующие данные также будут отправлены туда. Это действительно простой способ получения уведомлений от Mongo.

2 голосов
/ 25 мая 2017

Многие из этих ответов дадут вам только новые записи, а не обновления и / или крайне неэффективны

Единственный надежный и эффективный способ сделать это - создать настраиваемый курсор налокальная коллекция db: oplog.rs, чтобы получить ВСЕ изменения в MongoDB и делать с ней все, что вы захотите.(MongoDB даже делает это внутренне более или менее для поддержки репликации!)

Объяснение содержания оплога: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/

Пример библиотеки Node.js, которая предоставляет API для того, что являетсядоступно для оплога: https://github.com/cayasso/mongo-oplog

1 голос
/ 08 ноября 2016

На самом деле, вместо просмотра вывода, почему вы не получаете уведомления, когда вставляется что-то новое, используя промежуточное программное обеспечение, предоставленное схемой mongoose

Вы можете перехватить событие вставки нового документа и сделать что-нибудь после этой вставки

1 голос
/ 24 августа 2016

Существует рабочий пример Java, который можно найти здесь .

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> {
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) {
            DBObject obj = cur.next();
            System.out.println( obj );

        }
    };
    new Thread(task).start();

Ключ ВАРИАНТЫ ОПРОСА приведены здесь.

Также вы можете изменить запрос поиска, если вам не нужно каждый раз загружать все данные.

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
0 голосов
/ 14 марта 2019

Существует потрясающий набор доступных сервисов, который называется MongoDB Stitch . Посмотрите на функции стежков / триггеры . Обратите внимание, что это облачная платная услуга (AWS). В вашем случае, при вставке, вы можете вызвать пользовательскую функцию, написанную на javascript.

enter image description here

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...