Проблема реализации пакетного чтения MongoDB с набором реплик потока изменений - PullRequest
1 голос
/ 05 февраля 2020

Проблема: Процесс генерации вывода записывает около 300 данных вывода в коллекцию MongoDB в секунду. Функция потока изменений в MongoDB используется другим процессом для считывания этих выводов и последующей обработки. В настоящее время только один вывод данных возвращается при вызове API функции потока изменений (mongoc_change_stream_next ()). Таким образом, в общей сложности 300 таких вызовов требуется, чтобы получить все данные вывода, сохраненные в течение 1 секунды. Однако после каждого чтения требуется около 50 мс времени для выполнения последующей обработки для данных одного / нескольких выводов. Из-за единственной модели возврата данных вводится эффективная задержка 15x . Чтобы решить эту проблему, мы пытаемся реализовать механизм пакетного чтения , встроенный в функцию потока изменений MongoDB . Мы пробовали разные варианты реализации одного и того же, но все равно получали только одни данные после каждого вызова API потока изменений. Есть ли способ решить эту проблему?

Платформа: ОС: Ubuntu 16.04 Пн go - c -драйвер: 1.15.1 Пн go сервер: 4.0.12

Опции опробованы: Установка размера пакета курсора более 1.

int main(void) {
    const char *uri_string = "mongodb://localhost:27017/replicaSet=set0";
    mongoc_change_stream_t *stream;
    mongoc_collection_t *coll;
    bson_error_t error;
        mongoc_uri_t *uri;
    mongoc_client_t *client;

    /*
    * Add the Mongo DB blocking read and scall the inference parse function with the Json
                 * */
    uri = mongoc_uri_new_with_error (uri_string, &error);
    if (!uri) {
        fprintf (stderr,
        "failed to parse URI: %s\n"
        "error message:       %s\n",
        uri_string,
        error.message);
        return -1;
    }

    client = mongoc_client_new_from_uri (uri);
    if (!client) {
        return -1;
    }

    coll = mongoc_client_get_collection (client,  <DB-NAME>, <collection-name>);
    stream = mongoc_collection_watch (coll, &empty, NULL);
    mongoc_cursor_set_batch_size(stream->cursor, 20);
    while (1){
        while (mongoc_change_stream_next (stream, &doc)) {
            char *as_json = bson_as_relaxed_extended_json (doc, NULL); 
            ............
            ............
            //post processing consuming 50 ms of time
            ............
            ............
        }
        if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
            if (!bson_empty (err_doc)) {
                fprintf (stderr,
                "Server Error: %s\n",
                bson_as_relaxed_extended_json (err_doc, NULL));
            } else {
                fprintf (stderr, "Client Error: %s\n", error.message);
            }
            break;
        }
    }
    return 0;
}

1 Ответ

1 голос
/ 16 марта 2020

В настоящее время при вызове API функции потока изменений (mongoc_change_stream_next ()) выдается только один вывод данных.

Технически дело не в том, что возвращается один документ. Это связано с тем, что mongoc_change_stream_next () выполняет итерацию основного курсора, устанавливая каждый bson для следующего документа. Таким образом, даже размер возвращаемого пакета больше одного, он все равно должен повторяться для каждого документа.

Вы можете попробовать:

  • Создать отдельные потоки для параллельной обработки документов, поэтому вам не придется ждать 50 мс на документ или 15 секунд накопительно.

  • L oop через пакет документов, т.е. 50 кэширует их, затем выполняет пакетную обработку

  • Пакетная обработка их в отдельных потоках ( комбинация двух выше)

...