Я использую kafka-node для чтения потоковых данных и передачи их в мое веб-приложение с помощью веб-сокетов, используя NodeJS. Это прекрасно работает, если я могу определить сервер-производитель kafka и топи c, которые меня интересуют, однако для моего случая использования конечные пользователи будут вводить сервер-производитель kafka и топи c, а мой NodeJS backend будет нести ответственность за получение этого запроса и за создание соответствующих соединений kafka / websocket.
Моя идея заключалась в следующем:
Создать API отдыха, в который веб-приложение могло бы отправлять запросы для создания нового соединения kafka для потребителей / веб-сокетов ( / registerTopi c)
Сохранить новых потребителей kafka в глобальном массиве при создании нового потребителя kafka, чтобы впоследствии я мог приостановить или возобновить поток с помощью другого вызова API rest ( / pauseTopi c и / resumeTopi c)
Я столкнулся с проблемами при попытке переместить код WebSocket в /registerTopic... Когда я делаю это, все работает очень странно, и я внезапно получаю 1000x сообщений сразу, а затем 40-50x сообщений каждую секунду, хотя производитель kafka отправляет только 1 сообщение в секунду. Любые идеи о том, как я могу заставить это работать?
const express = require("express");
const app = express();
const cors = require('cors');
const bodyParser = require('body-parser');
const fs = require('fs');
app.use(cors());
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));
var WebSocketServer = require('websocket').server;
var http = require('http');
var https = require('https');
var kafka = require('kafka-node');
var topics = [];
var privateKey = fs.readFileSync('PATH', 'utf8');
var certificate = fs.readFileSync('PATH', 'utf8');
var credentials = { key: privateKey, cert: certificate };
var consumers = new Set();
// This was working without any issues before I tried to make this dynamic!
/* var Consumer = kafka.Consumer,
client = new kafka.KafkaClient('localhost:9092'),
consumer = new Consumer(
client, [{ topic: 'numtest', partition: 0 }], { autoCommit: false }); */
var server = http.createServer(function (request, response) {
console.log(' Request recieved : ' + request.url);
response.writeHead(404);
response.end();
});
server.listen(8080, function () {
console.log('Listening on port : 8080');
});
webSocketServer = new WebSocketServer({
httpServer: server,
autoAcceptConnections: false
});
function iSOriginAllowed(origin) {
return true;
}
// This was working without any issues before I tried to make this dynamic!
/* webSocketServer.on('request', function (request) {
if (!iSOriginAllowed(request.origin)) {
request.reject();
console.log(' Connection from : ' + request.origin + ' rejected.');
return;
}
let connection = request.accept('echo-protocol', request.origin);
console.log(' Connection accepted : ' + request.origin);
connection.on('message', function (message) {
if (message.type === 'utf8') {
console.log('Received Message: ' + message.utf8Data);
}
});
consumer.on('message', function (message) {
console.log('msg');
connection.sendUTF(message.value);
});
connection.on('close', function (reasonCode, description) {
console.log('Connection ' + connection.remoteAddress + ' disconnected.');
});
}); */
var httpsServer = http.createServer(credentials, app);
httpsServer.listen(3000, () => {
console.log("Server running on port 3000");
});
app.get("/getTopics", (req, res, next) => {
res.json(topics);
});
app.post("/registerTopic", (req, res) => {
try {
var client = new kafka.KafkaClient(req.body.host);
var Consumer = kafka.Consumer;
consumer = new Consumer(
client, [{ topic: req.body.topic, partition: 0 }], { autoCommit: false });
let consumerExists = false;
for (let c = 0; c < [...consumers].length; c++) {
if ([...consumers][c].topic == req.body.topic && [...consumers][c].sessionId == req.body.sessionId) {
consumerExists = true;
}
}
if (!consumerExists) {
consumers.add({ 'topic': req.body.topic, 'sessionId': req.body.sessionId, 'consumer': consumer });
}
client.loadMetadataForTopics([], function (error, results) {
Object.keys(results[1].metadata).forEach(function (key) {
var value = results[1].metadata[key];
if (!value['0'].topic.includes('__') && !value['0'].topic.includes('offset')) {
topics.push({ 'producer': req.body.host, 'topic': value['0'].topic });
}
});
});
webSocketServer.on('request', function (request) {
if (!iSOriginAllowed(request.origin)) {
request.reject();
console.log(' Connection from : ' + request.origin + ' rejected.');
return;
}
let connection = request.accept('echo-protocol', request.origin);
console.log(' Connection accepted : ' + request.origin);
connection.on('message', function (message) {
if (message.type === 'utf8') {
console.log('Received Message: ' + message.utf8Data);
}
});
consumer.on('message', function (message) {
console.log('msg');
connection.sendUTF(message.value);
});
connection.on('close', function (reasonCode, description) {
console.log('Connection ' + connection.remoteAddress + ' disconnected.');
});
});
res.json("Working");
} catch (error) {
console.error(error);
res.status(400).send('Unable to register new topic')
}
});
app.post("/pauseTopic", (req, res) => {
try {
console.log(req.body);
let filteredConsumer = [...consumers].filter(function (item) {
console.log(req.body.topic, item.sessionId);
if (item.topic == req.body.topic && item.sessionId == req.body.sessionId) {
return c;
}
});
console.log(filteredConsumer);
//filteredConsumer[0].consumer.pause();
res.json("Working");
} catch (error) {
console.error(error);
res.status(400).send('Unable to register new topic')
}
});
app.post("/resumeTopic", (req, res) => {
try {
let filteredConsumer = [...consumers].filter(function (item) {
if (item.topic == req.body.topic && item.sessionId == req.body.sessionId) {
return item;
}
});
filteredConsumer[0].consumer.resume();
res.json("Working");
} catch (error) {
console.error(error);
res.status(400).send('Unable to register new topic')
}
});