Я экспериментирую с Apache Kafka, NodeJS и MongoDB.У меня есть простая программа, которая просто добавляет мое имя в mongodb для практики.Я пытаюсь соединить это с Кафкой, и это работает не так, как мне бы хотелось.См. Код ниже.
index.html
<!DOCTYPE html>
<html>
<head>
</head>
<body>
<form method="post" action="/addname">
<label>Enter Your Name</label><br>
<input type="text" name="firstName" placeholder="Enter first name..."
required>
<input type="text" name="lastName" placeholder="Enter last name..."
required>
<input type="submit" value="Add Name">
</form>
</body>
</html>
index.js
const express = require("express");
const mongoose = require("mongoose");
const bodyParser = require('body-parser');
const kafkaSend = require('./kafka/kafkaScripts/producer.js');
const kafkaRead = require('./kafka/kafkaScripts/consumer.js');
const app = express();
const port = 3000;
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));
mongoose.Promise =
global.Promise;mongoose.connect("mongodb://localhost:27017/q-research");
var nameSchema = new mongoose.Schema({
firstName: String,
lastName: String
});
var User = mongoose.model("User", nameSchema);
app.post("/addname", (req, res) => {
var myData = new User(req.body);
myData.save()
.then(item => {
let testObj ={
type: item,
userId:'userID',
sessionId: 'whateverSessionId',
data: item
};
kafkaSend.sendRecord(testObj,function(err,data){
if(err){
console.log('error: ', err);
}
else{
res.send("item saved to database");
}
});
})
.catch(err => {
console.log('err: ', err);
res.status(400).send("unable to save to database");
});
});
/* app start info */
app.use("/", (req, res) => {
res.sendFile(__dirname + "/index.html");
});
app.get("/", (req, res) => {
res.send("Hello World");
});
app.listen(port, () => {
console.log("Server listening on port " + port);
});
Manufacturer.js
const kafka = require('kafka-node'),
uuid = require('uuid');
const client = new kafka.Client("localhost:2181", 8, {
sessionTimeout: 300,
spinDelay: 100,
retries: 2
});
const producer = new kafka.HighLevelProducer(client);
producer.on("ready", function() {
console.log("Kafka Producer is connected and ready.");
});
// For this demo we just log producer errors to the console.
producer.on("error", function(error) {
console.error(error);
});
const KafkaService = {
sendRecord: ({ type, userId, sessionId, data }, callback = () => {}) => {
if (!userId) {
return callback(new Error(`A userId must be provided.`));
}
const event = {
id: uuid.v4(),
timestamp: Date.now(),
userId: userId,
sessionId: sessionId,
type: type,
data: data
};
const buffer = new Buffer.from(JSON.stringify(event));
// Create a new payload
const record = [
{
topic: "webevents.dev",
messages: buffer,
attributes: 1 /* Use GZip compression for the payload */
}
];
//Send record to Kafka and log result/error
producer.send(record, callback);
}
};
module.exports = KafkaService;
Я получаю сообщение об ошибке ниже.Итак, вот мой вопрос. Я считаю, что мне нужно изменить свой тип в моем тестовом объекте, чтобы он мог работать.Я не уверен в этом.Это могло бы и дополнительные вещи.![error](https://i.stack.imgur.com/mBr4c.png)