Используйте обмен сообщения внутри метода .to () в Apache Camel - PullRequest
0 голосов
/ 03 апреля 2019

Я новичок в верблюде и хотел бы динамически изменить свой маршрут в соответствии с какой-то логикой, предварительно сформированной перед раздачей

camelContext.addRoutes(new RouteBuilder() {
        public void configure() {
            PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class);
            pc.setLocation("classpath:application.properties");

            log.info("About to start route: Kafka Server -> Log ");

            from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                    + "&maxPollRecords={{consumer.maxPollRecords}}"
                    + "&consumersCount={{consumer.consumersCount}}"
                    + "&seekTo={{consumer.seekTo}}"
                    + "&groupId={{consumer.group}}"
                    + "&valueDeserializer=" + BytesDeserializer.class.getName())
                    .routeId("FromKafka")
                    .process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            System.out.println(" message: " + exchange.getIn().getBody());
                            Bytes body = exchange.getIn().getBody(Bytes.class);
                            HashMap data = (HashMap)SerializationUtils.deserialize(body.get());
                            // do some work on data;
                            Map messageBusDetails = new HashMap();
                            messageBusDetails.put("topicName", "someTopic");
                            messageBusDetails.put("producerOption", "bla");
                            exchange.getOut().setHeader("kafka", messageBusDetails);
                            exchange.getOut().setBody(SerializationUtils.serialize(data));
                        }
                    }).choice()
                        .when(header("kafka"))
                            .to("kafka:"+ **getHeader("kafka").get("topicName")** )
                .log("${body}");
        }
    });

getHeader ( "Кафка"). Получить ( "topicName")

это то, чего я пытаюсь достичь.

Но я не знаю, как получить доступ к значению заголовков (которое является картой - потому что у производителя kafka может быть больше конфигурации) внутри .to ()

Я понимаю, что могу использовать его совершенно неправильно ... Но это то, что мне удалось понять до сих пор ...

Основная цель - иметь несколько шин сообщений как .from () и несколько параметров шины сообщений в .to (), которые будут определены через внешний источник (например, файл конфигурации) таким образом, что один и тот же маршрут будет применяться ко многим логическим сценариям и я думал, что выбор () является лучшим ответом Спасибо!

1 Ответ

0 голосов
/ 04 апреля 2019

Вместо to () , вы можете использовать toD () , что является «Динамическим до» См. для получения подробной информации

Синтаксис для извлечения различных заголовков и т. Д. См. На странице Простое выражение

...