Публикация нескольких сообщений одновременно в паттициях Kafka с использованием многопоточности в целях тестирования для проверки производительности - PullRequest
0 голосов
/ 08 мая 2020
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.util.Properties;

 class MultithreadingDemo extends Thread 
 { 
public void run() 
{ 
     Properties props = new Properties();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:443");
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
     io.confluent.kafka.serializers.KafkaAvroSerializer.class);
     props.put("schema.registry.url", "xxx");
     props.put(ProducerConfig.ACKS_CONFIG, "all");
     props.put("security.protocol", "SSL");
     props.put("ssl.truststore.location", "xxx");
     props.put("ssl.truststore.password", "xxx");
     props.put("ssl.keystore.location", "xxx");
     props.put("ssl.keystore.password", "xxx");
     props.put("ssl.key.password", "xxx");
    KafkaProducer producer = new KafkaProducer(props);
    String userSchema = "{   \"name\": \"MyClass\",   \"type\": \"record\",   \"namespace\": 
      \"com.oop.hts\",   \"fields\": [     {       \"name\": \"appId\",       \"type\": 
\"string\"     },     {       \"name\": \"appName\",       \"type\": \"string\"     },     {       
\"name\": \"groups\",       \"type\": \"string\"     },     {       \"name\": \"subGroups\",       
\"type\": \"string\"     },     {       \"name\": \"jobType\",       \"type\": \"string\"     
 },     {       \"name\": \"appStartTime\",       \"type\": \"string\"     },     {       
 \"name\": \"appEndTime\",       \"type\": \"string\"     },     {       \"name\": 
 \"appDuration\",       \"type\": \"int\"     },     {       \"name\": \"cpuTime\",       
  \"type\": \"int\"     },     {       \"name\": \"runTime\",       \"type\": \"int\"     },     
 {       \"name\": \"memoryUsage\",       \"type\": \"int\"     },     {       \"name\": 
\"appStatus\",       \"type\": \"string\"     },     {       \"name\": \"appResult\",       
\"type\": \"string\"     },     {       \"name\": \"failureREason\",       \"type\": 
\"string\"     },     {       \"name\": \"recordCount\",       \"type\": \"string\"     },     
{       \"name\": \"numexecutors\",       \"type\": \"string\"     },     {       \"name\": 
\"executorcores\",       \"type\": \"string\"     },     {       \"name\": 
\"executormemory\",       \"type\": \"string\"     }   ] }\n" + 
            "";
System.out.println("schema:" + userSchema);
     Schema.Parser parser = new Schema.Parser();
     Schema schema = parser.parse(userSchema);
     GenericRecord avroRecord = new GenericData.Record(schema);
     //avroRecord.put("f1", "value777");
     System.out.println("----" + avroRecord);
     avroRecord.put("appId","spark-d0731a81f1b64f109c5d985c1b2e0011");
     avroRecord.put("appName","H@S-UCR");
     avroRecord.put("groups","");
     avroRecord.put("subGroups","");
     avroRecord.put("jobType","");
     avroRecord.put("appStartTime","2020-04-13T10:02:25.902");
     avroRecord.put("appEndTime","2020-04-13T10:02:25.902");
     avroRecord.put("appDuration",4110);
     avroRecord.put("cpuTime",337468);
     avroRecord.put("runTime",1198987);
     avroRecord.put("memoryUsage",234933352);
     avroRecord.put("appStatus","Running");
     avroRecord.put("appResult","InProgress");
     avroRecord.put("failureREason","");
     avroRecord.put("recordCount","0");
     avroRecord.put("numexecutors","25");
     avroRecord.put("executorcores","15");
     avroRecord.put("executormemory","60g");
     System.out.println("----"+ avroRecord);

     ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, 
     GenericRecord>("kaas.topic", avroRecord);
     try {
         producer.send(record);
         System.out.println("Successfully produced the records to the Kafka topic : 
   kaas.dqhats.target ");
     } catch(SerializationException e) {
         System.out.println("An Exception occured" + e.getMessage());
         e.printStackTrace();
     }

   } 
  } 

 // Main Class 
 public class Multithread 
 { 
 public static void main(String[] args) 
 { 
    int n = 8; // Number of threads 
    for (int i=0; i<n; i++) 
    { 
        MultithreadingDemo object = new MultithreadingDemo(); 

        object.start(); 

    } 

   } 
 } 

Я хочу создать несколько сообщений для разделов kafka с использованием многопоточности. (Это необходимо для проверки производительности / емкости темы / разделов kafka)

Я не могу создать следующий код сообщения в разделы kafka параллельно.

Обращение за помощью.

Публикация нескольких сообщений одновременно в паттициях Kafka с использованием многопоточности для целей тестирования для проверки производительности

Может ли кто-нибудь помочь мне в публикации нескольких сообщений одновременно в паттициях Kafka с использованием многопоточности.

1 Ответ

0 голосов
/ 08 мая 2020

Метод send() только поместит сообщение в буфер, и сообщения будут отправлены как часть отдельного потока. По сути, это асинхронный характер производителя, который демонстрируется.

Более того, после вызова метода send() объект Future, возвращаемый этим вызовом, игнорируется, поэтому вы действительно не получите любой способ узнать, было ли ваше сообщение отправлено или нет.

Вы можете попробовать:

  1. Послать метод синхронно, позвонив:

producer.send(record).get();

Это будет ждать ответа от Kafka, прежде чем двигаться дальше, и вы получите сообщение об ошибке, если возникнут какие-либо проблемы с отправкой этого сообщения в Kafka.

или

Вызвать метод flush() после send().

Как следует из названия, этот метод передает sh сообщения в буфере, но вот справочная документация для этого, если вы хотите узнать больше.

Надеюсь, это поможет!

...