Динамически создавать потребителей Kafka, которые отправляют данные через веб-сокет - PullRequest
0 голосов
/ 28 января 2020

Я использую kafka-node для чтения потоковых данных и передачи их в мое веб-приложение с помощью веб-сокетов, используя NodeJS. Это прекрасно работает, если я могу определить сервер-производитель kafka и топи c, которые меня интересуют, однако для моего случая использования конечные пользователи будут вводить сервер-производитель kafka и топи c, а мой NodeJS backend будет нести ответственность за получение этого запроса и за создание соответствующих соединений kafka / websocket.

Моя идея заключалась в следующем:

  1. Создать API отдыха, в который веб-приложение могло бы отправлять запросы для создания нового соединения kafka для потребителей / веб-сокетов ( / registerTopi c)

  2. Сохранить новых потребителей kafka в глобальном массиве при создании нового потребителя kafka, чтобы впоследствии я мог приостановить или возобновить поток с помощью другого вызова API rest ( / pauseTopi c и / resumeTopi c)

Я столкнулся с проблемами при попытке переместить код WebSocket в /registerTopic... Когда я делаю это, все работает очень странно, и я внезапно получаю 1000x сообщений сразу, а затем 40-50x сообщений каждую секунду, хотя производитель kafka отправляет только 1 сообщение в секунду. Любые идеи о том, как я могу заставить это работать?

const express = require("express");
const app = express();
const cors = require('cors');
const bodyParser = require('body-parser');
const fs = require('fs');


app.use(cors());
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));

var WebSocketServer = require('websocket').server;
var http = require('http');
var https = require('https');
var kafka = require('kafka-node');
var topics = [];
var privateKey = fs.readFileSync('PATH', 'utf8');
var certificate = fs.readFileSync('PATH', 'utf8');
var credentials = { key: privateKey, cert: certificate };
var consumers = new Set();

// This was working without any issues before I tried to make this dynamic!
/* var Consumer = kafka.Consumer,
  client = new kafka.KafkaClient('localhost:9092'),
  consumer = new Consumer(
    client, [{ topic: 'numtest', partition: 0 }], { autoCommit: false }); */

var server = http.createServer(function (request, response) {
  console.log(' Request recieved : ' + request.url);
  response.writeHead(404);
  response.end();
});
server.listen(8080, function () {
  console.log('Listening on port : 8080');
});

webSocketServer = new WebSocketServer({
  httpServer: server,
  autoAcceptConnections: false
});

function iSOriginAllowed(origin) {
  return true;
}

// This was working without any issues before I tried to make this dynamic!
/* webSocketServer.on('request', function (request) {
  if (!iSOriginAllowed(request.origin)) {
    request.reject();
    console.log(' Connection from : ' + request.origin + ' rejected.');
    return;
  }

  let connection = request.accept('echo-protocol', request.origin);
  console.log(' Connection accepted : ' + request.origin);
  connection.on('message', function (message) {
    if (message.type === 'utf8') {
      console.log('Received Message: ' + message.utf8Data);
    }
  });
  consumer.on('message', function (message) {
    console.log('msg');
    connection.sendUTF(message.value);
  });
  connection.on('close', function (reasonCode, description) {
    console.log('Connection ' + connection.remoteAddress + ' disconnected.');
  });
}); */

var httpsServer = http.createServer(credentials, app);

httpsServer.listen(3000, () => {
  console.log("Server running on port 3000");
});

app.get("/getTopics", (req, res, next) => {
  res.json(topics);
});

app.post("/registerTopic", (req, res) => {
  try {
    var client = new kafka.KafkaClient(req.body.host);
    var Consumer = kafka.Consumer;
    consumer = new Consumer(
      client, [{ topic: req.body.topic, partition: 0 }], { autoCommit: false });
    let consumerExists = false;
    for (let c = 0; c < [...consumers].length; c++) {
      if ([...consumers][c].topic == req.body.topic && [...consumers][c].sessionId == req.body.sessionId) {
        consumerExists = true;
      }
    }
    if (!consumerExists) {
      consumers.add({ 'topic': req.body.topic, 'sessionId': req.body.sessionId, 'consumer': consumer });
    }
    client.loadMetadataForTopics([], function (error, results) {
      Object.keys(results[1].metadata).forEach(function (key) {
        var value = results[1].metadata[key];
        if (!value['0'].topic.includes('__') && !value['0'].topic.includes('offset')) {
          topics.push({ 'producer': req.body.host, 'topic': value['0'].topic });
        }
      });
    });
    webSocketServer.on('request', function (request) {
      if (!iSOriginAllowed(request.origin)) {
        request.reject();
        console.log(' Connection from : ' + request.origin + ' rejected.');
        return;
      }

      let connection = request.accept('echo-protocol', request.origin);
      console.log(' Connection accepted : ' + request.origin);
      connection.on('message', function (message) {
        if (message.type === 'utf8') {
          console.log('Received Message: ' + message.utf8Data);
        }
      });
      consumer.on('message', function (message) {
        console.log('msg');
        connection.sendUTF(message.value);
      });
      connection.on('close', function (reasonCode, description) {
        console.log('Connection ' + connection.remoteAddress + ' disconnected.');
      });
    });
    res.json("Working");
  } catch (error) {
    console.error(error);
    res.status(400).send('Unable to register new topic')
  }
});

app.post("/pauseTopic", (req, res) => {
  try {
    console.log(req.body);
    let filteredConsumer = [...consumers].filter(function (item) {
      console.log(req.body.topic, item.sessionId);
      if (item.topic == req.body.topic && item.sessionId == req.body.sessionId) {
        return c;
      }
    });
    console.log(filteredConsumer);
    //filteredConsumer[0].consumer.pause();
    res.json("Working");
  } catch (error) {
    console.error(error);
    res.status(400).send('Unable to register new topic')
  }
});

app.post("/resumeTopic", (req, res) => {
  try {
    let filteredConsumer = [...consumers].filter(function (item) {
      if (item.topic == req.body.topic && item.sessionId == req.body.sessionId) {
        return item;
      }
    });
    filteredConsumer[0].consumer.resume();
    res.json("Working");
  } catch (error) {
    console.error(error);
    res.status(400).send('Unable to register new topic')
  }
});
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...