Apache Camel Producer очень медленный - PullRequest
0 голосов
/ 14 января 2020

У меня проблема с Apache Camel Kafka Producer. Я публикую пример из двух подходов. Мы должны обрабатывать 5 миллионов записей в час. с конфигурацией OSE 2 ГБ ОЗУ, 2 CORE на POD.

 JacksonDataFormat jsonOut = new JacksonDataFormat(TOPICNAME.class);

        from("kafka:{{consumer.topic.TOPICNAME}}?brokers={{kafka_dev.host}}"
         + "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount={{consumer.consumersCount}}"
         + "&seekTo={{consumer.seekTo}}" + "&groupId={{consumer.group}}" + "&keyDeserializer=" + KEYDESERIALIZER
         + "&valueDeserializer=" + VALUEDESERIALIZER
         + SSL)
         
        .doTry().routeId("FromKafka-TOPICNAME")
        .process(new CamelTranslator("TOPICNAME")).marshal(jsonOut)
        .to("seda:producertopic")
        .log("Sent\t"+ "TOPICNAME"+"\t"+" - Sent    Timestamp:"+ "simple(${date:now:yyyy-MM-dd HH:mm:ss.SSS})")
        .doCatch(Exception.class).process(new ExceptionProcessor());



        from("seda:producertopic")
        .to("kafka:{{producer.topic.topicname}}?brokers={{kafka_dev.host}}" + SSL);



 

В этом есть SEDA, с некоторыми пользовательскими потоками (10) и max reocords (10), counsumercount (2), мы смогли обрабатывать 60 сообщений как таковых c в течение 5 минут, после чего он начал выдавать исключение «Очередь заполнена».

 

    JacksonDataFormat jsonOut = new JacksonDataFormat(TOPICNAME.class);
    from("kafka:{{consumer.topic.TOPICNAME}}?brokers={{kafka_dev.host}}"
     + "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount={{consumer.consumersCount}}"
     + "&seekTo={{consumer.seekTo}}" + "&groupId={{consumer.group}}" + "&keyDeserializer=" + KEYDESERIALIZER
     + "&valueDeserializer=" + VALUEDESERIALIZER
     + SSL)
     .log("Received\t"+ "S_DOC_QUOTE"+"\t"+" - Recived Timestamp:"+ "simple(${date:now:yyyy-MM-dd HH:mm:ss.SSS})")
    .doTry().routeId("FromKafka-TOPICNAME")
    .log("Received_1\t"+ "S_DOC_QUOTE"+"\t"+" - Recived Timestamp:"+ "simple(${date:now:yyyy-MM-dd HH:mm:ss.SSS})")
    .process(new CamelTranslator("TOPICNAME")).marshal(jsonOut)
    .log("Received_2\t"+ "S_DOC_QUOTE"+"\t"+" - Recived Timestamp:"+ "simple(${date:now:yyyy-MM-dd HH:mm:ss.SSS})")
    .to("direct:producertopic")
    .log("Sent\t"+ "TOPICNAME"+"\t"+" - Sent    Timestamp:"+ "simple(${date:now:yyyy-MM-dd HH:mm:ss.SSS})")
    .doCatch(Exception.class).process(new ExceptionProcessor());





    from("direct:producertopic")
    .to("kafka:{{producer.topic.topicname}}?brokers={{kafka_dev.host}}" + SSL);

Это прямое сообщение. Мы можем обрабатывать 3 записи в секунду. Продюсер берет 400 милис c при отправке каждой записи.

 FromKafka-TOPICNAME: Received TOPICNAME- Recived Timestamp:simple(2020-01-13 18:34:35.756)
 FromKafka-TOPICNAME:Received_1 TOPICNAME-Recived Timestamp:simple(2020-01-13 18:34:35.757)
 FromKafka-TOPICNAME:Received_2 TOPICNAME-Recived Timestamp:simple(2020-01-13 18:34:35.761)
 FromKafka-TOPICNAME: Sent TOPICNAME    - Recived Timestamp:simple(2020-01-13 18:34:36.089)
...