У меня проблема с Apache Camel Kafka Producer. Я публикую пример из двух подходов. Мы должны обрабатывать 5 миллионов записей в час. с конфигурацией OSE 2 ГБ ОЗУ, 2 CORE на POD.
JacksonDataFormat jsonOut = new JacksonDataFormat(TOPICNAME.class);
from("kafka:{{consumer.topic.TOPICNAME}}?brokers={{kafka_dev.host}}"
+ "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount={{consumer.consumersCount}}"
+ "&seekTo={{consumer.seekTo}}" + "&groupId={{consumer.group}}" + "&keyDeserializer=" + KEYDESERIALIZER
+ "&valueDeserializer=" + VALUEDESERIALIZER
+ SSL)
.doTry().routeId("FromKafka-TOPICNAME")
.process(new CamelTranslator("TOPICNAME")).marshal(jsonOut)
.to("seda:producertopic")
.log("Sent\t"+ "TOPICNAME"+"\t"+" - Sent Timestamp:"+ "simple(${date:now:yyyy-MM-dd HH:mm:ss.SSS})")
.doCatch(Exception.class).process(new ExceptionProcessor());
from("seda:producertopic")
.to("kafka:{{producer.topic.topicname}}?brokers={{kafka_dev.host}}" + SSL);
В этом есть SEDA, с некоторыми пользовательскими потоками (10) и max reocords (10), counsumercount (2), мы смогли обрабатывать 60 сообщений как таковых c в течение 5 минут, после чего он начал выдавать исключение «Очередь заполнена».
JacksonDataFormat jsonOut = new JacksonDataFormat(TOPICNAME.class);
from("kafka:{{consumer.topic.TOPICNAME}}?brokers={{kafka_dev.host}}"
+ "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount={{consumer.consumersCount}}"
+ "&seekTo={{consumer.seekTo}}" + "&groupId={{consumer.group}}" + "&keyDeserializer=" + KEYDESERIALIZER
+ "&valueDeserializer=" + VALUEDESERIALIZER
+ SSL)
.log("Received\t"+ "S_DOC_QUOTE"+"\t"+" - Recived Timestamp:"+ "simple(${date:now:yyyy-MM-dd HH:mm:ss.SSS})")
.doTry().routeId("FromKafka-TOPICNAME")
.log("Received_1\t"+ "S_DOC_QUOTE"+"\t"+" - Recived Timestamp:"+ "simple(${date:now:yyyy-MM-dd HH:mm:ss.SSS})")
.process(new CamelTranslator("TOPICNAME")).marshal(jsonOut)
.log("Received_2\t"+ "S_DOC_QUOTE"+"\t"+" - Recived Timestamp:"+ "simple(${date:now:yyyy-MM-dd HH:mm:ss.SSS})")
.to("direct:producertopic")
.log("Sent\t"+ "TOPICNAME"+"\t"+" - Sent Timestamp:"+ "simple(${date:now:yyyy-MM-dd HH:mm:ss.SSS})")
.doCatch(Exception.class).process(new ExceptionProcessor());
from("direct:producertopic")
.to("kafka:{{producer.topic.topicname}}?brokers={{kafka_dev.host}}" + SSL);
Это прямое сообщение. Мы можем обрабатывать 3 записи в секунду. Продюсер берет 400 милис c при отправке каждой записи.
FromKafka-TOPICNAME: Received TOPICNAME- Recived Timestamp:simple(2020-01-13 18:34:35.756)
FromKafka-TOPICNAME:Received_1 TOPICNAME-Recived Timestamp:simple(2020-01-13 18:34:35.757)
FromKafka-TOPICNAME:Received_2 TOPICNAME-Recived Timestamp:simple(2020-01-13 18:34:35.761)
FromKafka-TOPICNAME: Sent TOPICNAME - Recived Timestamp:simple(2020-01-13 18:34:36.089)