Как подключить AWS Bitnami Certified Kafka AMI к среде Elastic Beanstalk nodejs с помощью kafka-node - PullRequest
4 голосов
/ 18 июня 2019

Я пытаюсь подключить Сертифицированный Bitnami Kafka AMI со средой Elastic Beanstalk nodejs, используя kafka-node , как это сделать?

После установки apache Kafkaлокально и успешно протестировав его с Kafka-узлом, я хотел протестировать свое приложение с сервером AWS kafka.

Я настроил своих AWS Bitnami Certified Kafka AMI слушателей, чтобы они соответствовали моему общедоступному DNS (IPv4), и открыл порты 9092 и 2181 во входящих правилах, например:

Type            protocol     port    source

Custom TCP Rule    TCP       9092    0.0.0.0/0
Custom TCP Rule    TCP       2181    0.0.0.0/0

#server.properties    
listeners=SASL_PLAINTEXT://<Public DNS (IPv4) from AWS>:9092
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://<Public DNS (IPv4) from AWS>:9092

# Hostname and port the broker will advertise to producers and consumers. 
# If not set it uses the value for "listeners" if configured. Otherwise, it  
# will use the value returned from 
# java.net.InetAddress.getCanonicalHostName().
advertised.listeners=SASL_PLAINTEXT://<Public DNS (IPv4) from AWS>:9092

# root directory for all kafka znodes.
zookeeper.connect=<Public DNS (IPv4) from AWS>:2181

Я настраиваю своего производителя, используя kafka-node, следующим образом:

var Producer = kafka.Producer,
client = new kafka.KafkaClient({ kafkaHost: <kafka-public-ip>:9092}),
producer = new Producer(client);
producer.on('ready', function () {
console.log('Producer is ready');
});
producer.on('error', function (err) {
console.log('Producer is in error state');
console.log(err);
})

kafka-node генерирует ошибку времени ожидания Error: Unable to find available brokers to try

Я проверилпорт по умолчанию 22 с telnet open <kafka-instance-public-ip> 22, и он работал, но порт 9092 не работает.

Битнами Кафка AMI вопросы вкратце :

1- Как настроить БитнамиKafka AMI с AWS для удаленного доступа

1 Ответ

3 голосов
/ 19 июня 2019

, так как я настроил это следующим образом: это 2 файла, которые могут работать и требуют только экспресс и kafka-node@3.0.1

// consumer.js
const kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client('<IP of kafka server>:2181');
    consumer = new Consumer(client,
        [{ topic: '<>'}]
    );
console.log('listening')
consumer.on('message', function (message) {
    console.log(message);
});

consumer.on('error', function (err) {
    console.log('Error:',err);
})

consumer.on('offsetOutOfRange', function (err) {
    console.log('offsetOutOfRange:',err);
})

Это подключение к зоопарку, поэтому яЯ думаю, что вам понадобится версия 3.0.1 kafka-узла, поэтому при установке вам потребуется

npm install --save kafka-node@3.0.1

для прямого подключения к брокеру, вам, возможно, придется выяснить это самостоятельно.

// producer.js
const express = require('express');
const kafka = require('kafka-node');

const app = express();
const bodyParser = require('body-parser');

app.use(bodyParser.json()); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ extended: true }));

const { Producer } = kafka;
const client = new kafka.Client('<IP of kafka server>:2181');
const producer = new Producer(client);

producer.on('ready', () => {
  console.log('Producer is ready');
});

producer.on('error', err => {
  console.log('Producer is in error state');
  console.log(err);
});

app.post('/kafkaproducer', (req, res) => {
  const sentMessage = JSON.stringify(req.body.message);
  const payloads = [
    { topic: req.body.topic, messages: sentMessage, partition: 0 },
  ];
  producer.send(payloads, (err, data) => {
    if (data) {
      res.json(data);
    }
    if (err) {
      res.send(err);
    }
  });
});

app.get('/',function(req,res){
    res.json({greeting:'Kafka Producer'})
});

app.listen(5001,function(){
    console.log('Kafka producer running at 5001')
})

вы можете использовать почтальона для отправки http-запроса на http://localhost:5001/kafkaproducer в следующем формате

{
  topic: '<TOPIC YOU WANT>',
  messages: '<Can be any format you want even a json but i would advise just 
    testing with a basic string at first>'
}

, тогда потребитель получит сообщение, но убедитесь, что тема имеетбыл создан на сервере kafka, и у вас есть правильная тема для вашего потребителя.

в примечании, если вы использовали экземпляр EC2, вы могли бы объединить их

const express = require('express');
const kafka = require('kafka-node');

const app = express();
const bodyParser = require('body-parser');

app.use(bodyParser.json()); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ extended: true }));

const { Producer, Consumer } = kafka;
const client = new kafka.Client('13.56.240.35:2181');
const producer = new Producer(client);
consumer = new Consumer(client,
    [{ topic: 'memes-to-mturk'}]
);

producer.on('ready', () => {
  console.log('Producer is ready');
});

producer.on('error', err => {
  console.log('Producer is in error state');
  console.log(err);
});

consumer.on('message', function (message) {
    console.log(message);
});

consumer.on('error', function (err) {
    console.log('Error:',err);
})


app.get('/',function(req,res){
    res.json({greeting:'Kafka Producer'})
});

app.post('/kafkaproducer', (req, res) => {
  const sentMessage = JSON.stringify(req.body.message);
  console.log(sentMessage);
  const payloads = [
    { topic: req.body.topic, messages: sentMessage, partition: 0 },
  ];
  producer.send(payloads, (err, data) => {
    if (data) {
      res.json(data);
    }
    if (err) {
      res.send(err);
    }
  });
});

app.listen(5002,function(){
    console.log('Kafka producer running at 5001')
})
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...