MongoDB для индексации Elasticsearch - PullRequest
       1

MongoDB для индексации Elasticsearch

1 голос
/ 24 сентября 2019

Застрял в точке индексирования сбора данных вasticsearch.

Ниже приведен код, который я пытаюсь проиндексировать данные из Монго.

const elasticsearch = require('elasticsearch');
// instantiate an Elas

var bulk = [];


var MongoClient = require('mongodb').MongoClient;
var ObjectID = require('mongodb').ObjectID;
var mongoDBName = 'mydb'; // Name of mongodb goes here
var mongoCollectionName = 'mycollection'; // Collection name of mongodb goes here
var connectionString = 'mongodb://127.0.0.1:27017/'; // put username and password for mongo here

var esIndexName = 'new-collection'; // Elasticsearch index name will go here
var bulk = [];
const client = new elasticsearch.Client({
   hosts: [ 'http://localhost:9200']
});
// ping the client to be sure Elasticsearch is up
client.ping({
     requestTimeout: 30000,
 }, function(error) {
 // At this point, eastic search is down, please check your Elasticsearch service
     if (error) {
         console.error('Elasticsearch cluster is down!');
     } else {
         console.log('Everything is ok');
     }
 });


MongoClient.connect(connectionString+mongoDBName, function(err, db) {
    if(err) throw err;

   // for each object in a collection
   var collection = db.collection(mongoCollectionName);
   var counter = 0;
   collection.find().each(function(err, item, response, status) {
       console.log(item)
    Array.from(item).forEach(itemdata => {
        bulk.push({index:{ 
                        _index: esIndexName, 
                        _type: mongoCollectionName,
                    }          
                })
        bulk.push(itemdata)
        })
        //perform bulk indexing of the data passed
        client.bulk({body:bulk}, function( err, response  ){ 
            if( err ){ 
                console.log("Failed Bulk operation".red, err) 
            } else { 
                console.log("Successfully imported %s".green, mongoCollectionName.length); 
            } 
            console.log(response);
        });

    if(item != null) {    
        if(counter % 100 == 0) console.log( "Syncing object id: "+ item['_id'] + " #: " + counter);
        client.indices.create(
         { index: esIndexName },
         function(error, response) {
            if (error) {
                     console.log(error);
                 } else {
               console.log("created a new index", response);
              }
         }
       );
   }
     counter += 1;
   });
});

Итак, я пытаюсьДля индексации данных вasticsearch я смог создать индекс коллекции, но не смог вставить данные в индекс эластичного поиска.Может ли кто-нибудь помочь мне здесь?Где я ошибаюсь, и какую ошибку я здесь делаю.Здесь я использую nodejs, просто для проверки функцию, позже добавлю лямбда-функцию для обновления / удаления и какие-либо изменения.

Ответы [ 3 ]

0 голосов
/ 24 сентября 2019

Вы можете сделать logstash для импорта данных из mongo db в эластичный поиск. Пожалуйста, найдите прилагаемую конфигурацию для вашей справки.

 input {
    mongodb {
    codec => “json”
    uri => ‘mongodb://localhost:27017/NewDb’
    placeholder_db_dir => ‘/home/devbrt.shukla/Desktop/scalaoutput/ELK/logstash-6.4.1/db_dir’
    placeholder_db_name => ‘Employee_sqlite.db’
    collection => ‘Employee’
    batch_size => 5000
    generateId => ‘true’
    parse_method => “simple”
    }
    }
    filter {
    mutate {
    remove_field => [ “_id” ]
    }
    }
    output {
    elasticsearch {
    hosts => [“localhost:9200”]
    index => “employee-%{+YYYY.MM.dd}”
    }
    stdout { codec => rubydebug } }

В Logstash у нас будет три раздела Input, Filter и Output.

Ввод : получение данных из sql, mongodb, mysql и т. Д.
Фильтр : В этом разделе мы можем преобразовать пользовательский json для индексации вasticsearch.
Выходные данные : В этом разделе мы поместим имя индекса, тип документа и IP-адрес выходного раздела, т.е. эластичный поиск.

0 голосов
/ 24 сентября 2019

Вот решение, которое вы ищете

index.js

//MongoDB client config
var MongoClient = require('mongodb').MongoClient;
var mongoDBName = 'mydb'; // Name of mongodb goes here
var mongoCollectionName = 'mycollection'; // Collection name of mongodb goes here
var connectionString = 'mongodb://127.0.0.1:27017/'; // put username and password for mongo here

//Elasticsearch client config
const { Client } = require('@elastic/elasticsearch')
const esClient = new Client({ node: 'http://localhost:9200' });
var esIndexName = 'new-collection'; // Elasticsearch index name will go here

let bulk = [];

async function indexData() {

  const client = await MongoClient.connect(connectionString, { useNewUrlParser: true })
    .catch(err => { console.log(err); });

  if (!client) {
    return;
  }

  try {

    const db = client.db(mongoDBName);

    let collection = db.collection(mongoCollectionName);
    await collection.find().forEach((doc) => {
      bulk.push({
        index: {
          _index: esIndexName,
        }
      })

      let { _id, ...data } = doc;
      bulk.push(data);
    })
    console.log(bulk);

    await esClient.indices.create({
      index: esIndexName,
    }, { ignore: [400] })

    const { body: bulkResponse } = await esClient.bulk({ refresh: true, body: bulk })

    if (bulkResponse.errors) {
      const erroredDocuments = []
      // The items array has the same order of the dataset we just indexed.
      // The presence of the `error` key indicates that the operation
      // that we did for the document has failed.
      bulkResponse.items.forEach((action, i) => {
        const operation = Object.keys(action)[0]
        if (action[operation].error) {
          erroredDocuments.push({
            // If the status is 429 it means that you can retry the document,
            // otherwise it's very likely a mapping error, and you should
            // fix the document before to try it again.
            status: action[operation].status,
            error: action[operation].error,
            operation: bulk[i * 2],
            document: bulk[i * 2 + 1]
          })
        }
      })
      console.log(erroredDocuments)
    }

    const { body: count } = await esClient.count({ index: esIndexName })
    console.log(count)

  } catch (err) {

    console.log(err);
  } finally {
    client.close();
  }
}

indexData();

package.json

{
  "name": "elastic-node-mongo",
  "version": "1.0.0",
  "description": "Simple example to connect ElasticSearch, MongoDB and NodeJS",
  "main": "index.js",
  "dependencies": {
    "@elastic/elasticsearch": "^7.3.0",
    "mongodb": "^3.3.2",
    "nodemon": "1.18.3"
  },
  "scripts": {
    "dev": "nodemon",
    "start": "node index.js"
  },
  "keywords": [
    "nodejs",
    "node",
    "mongodb",
    "elasticsearch",
    "docker"
  ],
  "author": "Sathishkumar Rakkiasmy",
  "license": "ISC"
}

Пояснения

Мне удалось создать индекс коллекции, но не удалось вставить данные в индекс эластичного поиска.

Выше предложение имеет смысл.Поскольку переменная bulk не изменилась.

См. Ссылки ниже, почему переменная bulk не изменилась.

Почему моя переменная не изменилась после того, как я изменил ее внутри функции?- Ссылка на асинхронный код

Как вернуть ответ от асинхронного вызова?

Чтобы узнать больше об асинхронном программировании

https://developer.mozilla.org/en-US/docs/Learn/JavaScript/Asynchronous

https://developer.mozilla.org/en-US/docs/Learn/JavaScript/Asynchronous/Async_await

0 голосов
/ 24 сентября 2019

Прежде всего, я бы предложил привести в порядок ваш код;очень трудно понять, как вкладываются блоки.

Теперь с вашим кодом есть несколько проблем:

  1. Почему вы делаете Array.from(item).forEach(itemdata => {?item является объектом документа из Mongo, поэтому выполнение Array.from для него не имеет никакого эффекта.
  2. Вы вызываете API bulk внутри обратного вызова .each;Это означает, что вы будете выполнять вызов API для каждого документа.Я не думаю, что это то, что вы хотите.
  3. Вы создаете индекс после массовой операции.Это не верно.Вы должны создать свой индекс ES раз и навсегда перед вставкой документов.Это важно, потому что в будущем вам понадобится более сложная конфигурация для обработки ваших документов.
  4. Ваш ping вызов ES хорош, но он не мешает остальной части кодазапустите, если кластер не работает.

Итак, что вы должны сделать:

  1. Создайте индекс ES перед выполнением итерации по вашим документам.
  2. Итерируйте по своемуДокументы MongoDB и накапливаются в объекте вашего тела.
  3. Если у вас есть пакет документов n, вызовите API bulk и сбросьте ваше тело.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...