Kafka Producer с несколькими потоками - Java - PullRequest
1 голос
/ 09 мая 2019

Я хочу запустить производителя Кафки, используя несколько потоков.Ниже приведен код, который я опробовал.Я не знаю, как реализовать потоки в Kafka, так как не очень разбираюсь в программировании потоков.Ниже приведен код для моего производителя.

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class KafkaProducerWithThread  {
    //init params
    final String bootstrapServer = "127.0.0.1:9092";
    final String topicName = "spark-data-topic";
    final String csvFileName = "unique_products.csv";
    final static int MAX_THREAD = 2;  //created number of threads

    //Logger
    final Logger logger = LoggerFactory.getLogger(KafkaProducerWithThread.class);

    public KafkaProducerWithThread() throws FileNotFoundException {
    }

    public static void main(String[] args) throws IOException {

        new KafkaProducerWithThread().runProducer();
    }

    public void runProducer() throws IOException {

        //Read the CSV file from Resources folder as BufferedReader
        ClassLoader classLoader = new KafkaProducerWithThread().getClass().getClassLoader();
        BufferedReader reader = new BufferedReader(new FileReader(classLoader.getResource(csvFileName).getFile()));

        //Create a Kafka Producer
        org.apache.kafka.clients.producer.KafkaProducer<String, String> producer = createKafkaProducer();

        //Kafka Producer Metrics
        Metric requestTotalMetric = null;
        for (Map.Entry<MetricName, ? extends Metric> entry : producer.metrics().entrySet()) {
            if ("request-total".equals(entry.getKey().name())) {
                requestTotalMetric = entry.getValue();
            }
        }

        //Thread
        ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD);


        //Read the CSV file line by line
        String line = "";
        int i = 0;
        while ((line = reader.readLine()) != null) {
            i++;
            String key = "products_" + i;

            //Create a ProducerRecord
            ProducerRecord<String, String> csvProducerRecord = new ProducerRecord<>(topicName, key, line.trim());

            //Send the data - Asynchronously
            producer.send(csvProducerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    //executes every time a record is sent successfully or an exception is thrown
                    if (e == null) {
                        //the record was sent successfully
//                        logger.info("Received new metadata. \n" +
//                                "Topic: " + recordMetadata.topic() + "\n" +
//                                "Partition: " + recordMetadata.partition() + "\n" +
//                                "Offset: " + recordMetadata.offset() + "\n" +
//                                "Timestamp: " + recordMetadata.timestamp());
                    } else {
                        logger.error("Error while producing", e);
                    }
                }
            });

            if (i % 1000 == 0){
                logger.info("Record #: " + i + " Request rate: " + requestTotalMetric.metricValue());
            }

        }

        //Adding a shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("Stopping the Producer!");
            producer.flush();
            producer.close();
            logger.info("Stopped the Producer!");
        }));
    }

    public org.apache.kafka.clients.producer.KafkaProducer<String, String> createKafkaProducer() {
        //Create Producer Properties
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // For an idempotent producer
        //kafka can detect whether it's a duplicate data based on the producer request id.

        //Create high throughput Producer at the expense of latency & CPU
        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "60");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024)); //32KB batch size

        //Create Kafka Producer
        org.apache.kafka.clients.producer.KafkaProducer<String, String> csvProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
        return csvProducer;
    }
}

Может кто-нибудь помочь мне в реализации потоков в моей программе производителя Kafka?Мой продюсер будет выпускать более миллиона записей, поэтому я хочу реализовать потоки для того же.Мне известно о ExecutorService, используемом для программирования потоков, но я не уверен, как реализовать в этом случае.Спасибо.

1 Ответ

3 голосов
/ 09 мая 2019
  • создайте класс MessageSender, как указано ниже.
  • после создания класса продюсера создайте новый объект MesssageSender, взяв запись производителя и продюсера в качестве аргументов конструктора.
  • вызовите executorService.submit () для выполнения задачи.

          class Producer {
             ExecutorService executorService = 
              Executors.newFixedThreadPool(MAX_THREAD);
             //Read the CSV file line by line
             String line = "";
             int i = 0;
              while ((line = reader.readLine()) != null) {
                 //create produver record
                 ProducerRecord<String, String> csvProducerRecord = new ProducerRecord<>(topicName, key, line.trim());
                 MessageSender sendMessage= new MessageSender(csvProducerRecord,producer);
                executorService.submit()...
              }
                }
    
            //Thread class          
            class MessageSender implements Runnable<>{
             MessageSender(Producerrecord,producer{
            //store in class level variable in thread class
            }
            public void run(){
                producer.send(csvProducerRecord...);
            }
    
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...