Верблюд - собирать статистику по параллельным потребителям седы: // маршрут - PullRequest
0 голосов
/ 09 ноября 2018

Я пытаюсь собрать статистику времени выполнения для одновременных потребителей, подключенных к маршруту Camel seda: //. Каждый потребитель использует один и тот же Bean Processor Bean из Spring-прототипа Bean, так как я хочу, чтобы каждый потребитель работал независимо в своем потоке. Документация Camel гласит, что использование seda: // и одновременных потребителей будет запускать каждого потребителя в отдельном потоке. Кажется, это работает нормально, когда я запускаю процесс, я вижу 4 потока SyncQueueProcessor.

Для сбора статистики у меня есть отдельный Spring Bean (Singleton), который я могу автоматически подключить к «прототипу» процессора Camel, а затем у меня есть код в процессоре для отслеживания времени выполнения. Я использую текущий идентификатор потока, чтобы идентифицировать каждого параллельного потребителя. Процессор: я установил routeId в методе установки @Autowired единственного компонента ProcessStatistics. Я думаю, что метод создания вызывается ONCE при создании прототипа.

Верблюжий путь:

@Component
public class EnrichDocumentRoute extends RouteBuilder {

    private SyncQueueProcessor      syncQueueProcessor;

    @Value("${data-sync.processor.concurrency: 3}")
    private int concurrency;

    @Override
    public void configure() throws Exception {
        from("seda:processQueue?concurrentConsumers=" + concurrency)
            .routeId("enrichment")
            .log(LoggingLevel.DEBUG, "##### Started Enrichment Process for : ${body.billingDocumentIdentifier}")
            .process(this.getSyncQueueProcessor())
            .log(LoggingLevel.DEBUG, "##### Finished Enrichment Process for: ${body.billingDocumentIdentifier}")
            .end()
        ;
    }

    @Autowired
    public void setSyncQueueProcessor(SyncQueueProcessor syncQueueProcessor) {
        this.syncQueueProcessor = syncQueueProcessor;
    }

    public SyncQueueProcessor getSyncQueueProcessor() {
        counter++;
        LOG.info("getSyncQueueProcessor() - Factory sync-consumer [" + counter + "]");
        //return this.syncQueueProcessorFactory.getObject("sync-consumer-"+this.counter);
        //return new SyncQueueProcessor("sync-consumer-"+this.counter);
        //SyncQueueProcessor    lnewProcessor = new SyncQueueProcessor();
        //lnewProcessor.setRouteId("sync-consumer-"+this.counter);
        //return lnewProcessor;
        //return new SyncQueueProcessor();
        return syncQueueProcessor;
    }

Верблюжий процессор "прототип": SyncQueueProcessor

@Component
@Scope("prototype")
@Qualifier("syncQueueProcessor")
public class SyncQueueProcessor implements Processor {

    // BEAN : Spring Singleton Bean To collect statistics 
    private ProcessStatistics       processStatistics;

    // POJO : Collects the Time statistics, POJO is added to the
    // Spring Singleton bean in the setter method... 
    private ProcessStatisticsBean   processStatisticsBean;

    private String                  routeId;

    @Autowired
    public void setProcessStatistics(ProcessStatistics setValue) {
        this.processStatistics = setValue;
        this.processStatisticsBean = new ProcessStatisticsBean ();
        getProcessStatisticsBean().setRouteId("sync-consumer-" + Thread.currentThread().getId());
        getProcessStatisticsBean().setEndpointId(getRouteId());
        this.processStatistics.addItem(this.processStatisticsBean);
    }

Проблема, с которой я сталкиваюсь, заключается в том, что при запуске процесса я вижу только одного sync-consumer-23 ... Я бы ожидал увидеть в моей статистике 4 маршрута sync-consumer-nn. Для единственного синхронизирующего процессора nn я вижу в ProcessStatistics, что статистика Времени собирается.

Но почему не появляются 3 других потребителя? Я думаю, что «прототип» Scope позволил бы запускать метод сеттера ОДИН РАЗ для каждого Бина ... следовательно, каждый находится в своем собственном потоке.

Вещи, которые я пытался: - Установка routeId в начале класса ... без разницы.

tia, adym

ОБНОВЛЕНИЕ: Ну, я думаю, что я ответил на один вопрос, потому что я вижу только 1 в bean-компоненте stats, потому что сеттер называется ТОЛЬКО ОДИН РАЗ ... После прочтения этого поста, Как позволить Spring initialize "прототип "бобы, только когда он получен через getBean ()? , я предполагаю, что верблюжий маршрут это то, что вызывает это ... то есть Синглтон-бин (Camel Route) вызывает инициализацию прототипа.

Возможно, лучший вопрос: как я могу запустить инициализацию компонента-прототипа после назначения потока?

ОБНОВЛЕНИЕ: Попробовал использовать ObjectProvider, как описано в этом посте ... те же результаты: Spring Java Config: как создать @Bean с прототипом с аргументами времени выполнения? . Также другие неудачные попытки (закомментированные) в EnrichDocumentRoute.getSyncQueueProcessor () ...

...