Как добавить запись в MongoDb, используя Apache Kafka в NodeJS? - PullRequest
0 голосов
/ 02 октября 2018

Я экспериментирую с Apache Kafka, NodeJS и MongoDB.У меня есть простая программа, которая просто добавляет мое имя в mongodb для практики.Я пытаюсь соединить это с Кафкой, и это работает не так, как мне бы хотелось.См. Код ниже.

index.html

<!DOCTYPE html>
<html>
<head>
</head>

 <body>
 <form method="post" action="/addname">
 <label>Enter Your Name</label><br>
 <input type="text" name="firstName" placeholder="Enter first name..." 
required>
 <input type="text" name="lastName" placeholder="Enter last name..." 
   required>
 <input type="submit" value="Add Name">
 </form>
 </body>
</html>

index.js

const express = require("express");
const mongoose = require("mongoose");
const bodyParser = require('body-parser');
const kafkaSend = require('./kafka/kafkaScripts/producer.js');
const kafkaRead = require('./kafka/kafkaScripts/consumer.js');
const app = express();
const port = 3000;

app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));

mongoose.Promise = 
global.Promise;mongoose.connect("mongodb://localhost:27017/q-research");

var nameSchema = new mongoose.Schema({
   firstName: String,
   lastName: String
});

var User = mongoose.model("User", nameSchema);



app.post("/addname", (req, res) => {
   var myData = new User(req.body);
   myData.save()
   .then(item => {
       let testObj ={
           type: item,
           userId:'userID',
           sessionId: 'whateverSessionId',
           data: item

       };

       kafkaSend.sendRecord(testObj,function(err,data){
          if(err){
             console.log('error: ', err);
          }
          else{
             res.send("item saved to database");
          }

       });

   })
   .catch(err => {
      console.log('err: ', err);
      res.status(400).send("unable to save to database");
   });

});

/* app start info */
app.use("/", (req, res) => {
   res.sendFile(__dirname + "/index.html");
});

app.get("/", (req, res) => {
   res.send("Hello World");
});

app.listen(port, () => {
   console.log("Server listening on port " + port);
});

Manufacturer.js

const kafka = require('kafka-node'),
uuid = require('uuid');

const client = new kafka.Client("localhost:2181", 8, {
   sessionTimeout: 300,
   spinDelay: 100,
   retries: 2
});

const producer = new kafka.HighLevelProducer(client);
producer.on("ready", function() {
  console.log("Kafka Producer is connected and ready.");
});

// For this demo we just log producer errors to the console.
producer.on("error", function(error) {
   console.error(error);
});

const KafkaService = {
   sendRecord: ({ type, userId, sessionId, data }, callback = () => {}) => {
      if (!userId) {
          return callback(new Error(`A userId must be provided.`));
      }

      const event = {
          id: uuid.v4(),
          timestamp: Date.now(),
          userId: userId,
          sessionId: sessionId,
          type: type,
          data: data
      };

       const buffer = new Buffer.from(JSON.stringify(event));

       // Create a new payload
       const record = [
           {
               topic: "webevents.dev",
               messages: buffer,
               attributes: 1 /* Use GZip compression for the payload */
           }
       ];

       //Send record to Kafka and log result/error
       producer.send(record, callback);
   }
};
module.exports = KafkaService;

Я получаю сообщение об ошибке ниже.Итак, вот мой вопрос. Я считаю, что мне нужно изменить свой тип в моем тестовом объекте, чтобы он мог работать.Я не уверен в этом.Это могло бы и дополнительные вещи.error

1 Ответ

0 голосов
/ 02 октября 2018

Все, что мне нужно было сделать, это указать на мой виртуальный ящик вместо моего локального хоста, поскольку моя кафка находится в окне Ubuntu.

const client = new kafka.Client("10.10.205.17:2181, 8, {
   sessionTimeout: 300,
   spinDelay: 100,
   retries: 2
});
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...