Я строю производителя Apache kafka, который потребляется потребителем flink Kafka.Мне нужно генерировать 1 миллион до 10 миллионов сообщений в секунду.Однако сейчас я получаю очень небольшое количество записей в секунду (до 2000 в секунду на раздел).У меня есть кластер с 3 брокерами и 30 ГБ памяти в каждом.В теме также 10 разделов.Любые рекомендации, пожалуйста?
Вот мой код производителя
public class TempDataGenerator implements Runnable {
private String topic = "try";
private String bootStrap_Servers = "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092";
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
executor.execute(new TempDataGenerator());
}
public TempDataGenerator() {
}
private Producer<String, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootStrap_Servers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG,"0");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"5000000000");
props.put(ProducerConfig.BATCH_SIZE_CONFIG,"100000");
return new KafkaProducer<>(props);
}
public void run() {
final Producer<String, String> producer = createProducer();
Socket soc = null;
try {
boolean active = true;
int generatedCount = 0,tempUserID=1;//the minimum tuple that any thread can generate
while (active) {
generatedCount = 0;
/**
* generate per second
*/
for (long stop = Instant.now().getMillis()+1000; stop > Instant.now().getMillis(); ) { //generate tps
String msg = "{ID:" + generatedCount + ", msg: "+Instant.now().getMillis()+"}";
final ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, msg);
RecordMetadata metadata = producer.send(record).get();
producer.flush();
generatedCount++;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}