это правильный способ прочитать сообщение через производителя kafka и отправить его в тему - PullRequest
0 голосов
/ 30 мая 2018

Я написал этого производителя Kafka и прочитал файл с рабочего стола, а затем выдвинул данные в файл как значение и сгенерировал ключ самостоятельно, добавляя по одному при каждом чтении каждой строки.Это правильный путь, или я сделал то, чего не должен был делать ??Пожалуйста, нужен совет.Я вижу сообщение в своей теме, но каждое из них связано с ключом, поэтому, если у меня есть сценарий использования, я могу отправить любые данные журнала, подобные этим, если я читаю их извне.Могу ли я использовать данные журнала в качестве значения или есть совершенно другая логика, к которой я должен подходить.Пожалуйста, помогите

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;


public class SyncProducer {

    public static void main(String[] args) throws IOException {

        File file = new File("/Users/adityaverma/Desktop/ParseData.txt");

        BufferedReader br = new BufferedReader(new FileReader(file));

        Properties properties  = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("key.serializer",StringSerializer.class.getName()); // our key and values are String
        properties.setProperty("value.serializer",StringSerializer.class.getName());
        properties.setProperty("acks", "1"); 
        properties.setProperty("retries", "3"); 
        properties.setProperty("linger.ms", "1"); 



        Producer<String,String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String,String>(properties);
        // these will go in random partition as we increment the key

        String line = " ";

        int key = 0;
        while((line = br.readLine()) != null){
        //  System.out.println(line);

        ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>("try_Buffered3Part",Integer.toString(key),line);
         key++;
         System.out.println(key);
        producer.send(producerRecord);

        }
        producer.close();
        System.out.println("exit");
    }

}

Ответы [ 2 ]

0 голосов
/ 30 мая 2018

Это правильный путь

Не ясно, ваши цели.Можете ли вы использовать данные из терминала?Тогда вы производите хорошо.

Вы можете использовать целые числа в качестве ключей.Kafka имеет IntegerSerializer

Использование нулевого значения в качестве ключа или исключение этого параметра является стандартным способом отправки данных в случайный раздел, и вы не нажмете целочисленную перегрузку

Я хочучитать журналы из какого-либо источника через Kafka, а затем записывать их в HDFS.

Если вы просто хотите, чтобы данные журнала были в Hadoop, Fluentd или Logstash могут выполнить это.

Прежде чем вы начнете идти по этому пути с Kafka, вам обязательно нужно выбрать формат данных.Например, Hadoop и Kafka предпочитают Avro или JSON, а не CSV.Confluent имеет много документации по созданию Avro для Kafka

. Вы можете использовать Kafka Connect HDFS Connector или Apache Nifi, чтобы передавать данные Kafka в Hadoop.Не изобретайте колесо, пишущее вашему собственному потребителю.

0 голосов
/ 30 мая 2018

Кажется, хорошо.Вы можете использовать нулевой ключ, если вы в порядке со случайным разделом.

Вы также можете посмотреть на интеграцию logstash и kafka.

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html

...