Apache camel-K: проблема с созданием генератора ActiveMQ - PullRequest
0 голосов
/ 13 февраля 2020

Я новичок в Apache верблюде и даже больше в Camel-K. Я строю стек в контейнере Kubernetes с очередью сообщений ActiveMQ в качестве входящего интерфейса с внешними источниками данных, InfluxDB в качестве хранилища данных и маршрутом Camel-K для маршрутизации сообщений от AMQ к InfluxDB. Помимо маршрута Camel-K, система работает нормально. Я открываю порт 61616 ActiveMQ для кластера через конечную точку stati c -IP, назначенную службе:

apiVersion: v1
kind: Service
metadata:
  name: activemq-external
spec:
  selector:
    app: activemq
  type: NodePort
  ports:
    - name: port-service-console
      port: 8161
      targetPort: 8161
      nodePort: 8161
      protocol: TCP
---
# Service definition for internal service with static IP (which can be used in camel-K integration "ActiveMQToInfluxDB.yaml")
# see https://kubernetes.io/docs/concepts/services-networking/service/#service-resource
apiVersion: v1
kind: Service
metadata:
  name: activemq-internal
spec:
#  type: NodePort
  ports:
    - name: port-internal
      port: 61616
      targetPort: 61616
#      nodePort: 61616
      protocol: TCP
---
apiVersion: v1
kind: Endpoints
metadata:
  name: activemq-internal
subsets:
  - addresses:
      - ip: 172.17.0.8
    ports:
      - port: 61616

Следовательно, я ожидаю, что порт должен быть доступен для других модулей в кластере через 172.17.0.8 : 61616.

Я запускаю маршрут camel-K по команде (не обращайте внимания на зависимость от apache commons, мне это нужно для работы со строками)

kamel run kamel-integrations/ActiveMQToInfluxDbRoute.java -d mvn:org.apache.commons:commons-lang3:3.9 -d mvn:org.influxdb:influxdb-java:2.17 -d mvn:org.apache.activemq:activemq-camel:5.15.11 -d mvn:org.apache.camel:camel-core:2.25.0

java код интеграции выглядит следующим образом:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.BindToRegistry;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.Message;
import org.apache.commons.lang3.StringUtils;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;

public class ActiveMQToInfluxDbRoute extends RouteBuilder {

    private static final String MESSAGE_HEADER = "InfluxTimestamp";

    @BindToRegistry
    public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
        System.out.println("ActiveMQ Listener: STARTING...");
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://172.17.0.8:61616");
        connectionFactory.setUserName("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setClientID("Influx Message Queue");
        connectionFactory.setConnectResponseTimeout(300);
        System.out.println("ActiveMQ Listener: STARTED");
        return connectionFactory;
    }

    @Override
    public void configure() throws Exception {

    String sourceString = "activemq:queue:cryring_db_inbound?brokerURL=tcp://172.17.0.8:61616";
    String targetString = "influxdb://influxDb?databaseName=my_database=true&retentionPolicy=default";

    from(sourceString) //
      .process(messagePayload -> {
           String manipulation stuff
      })//
     .to(targetString) //
     .onException(Exception.class) //
     .useOriginalMessage() //
     .handled(true) //
     .log("error") //
     .to("stream:out");
    }

Не удается выполнить код java из-за исключения:

Route(route1)[From[activemq:queue:my_database?brokerU... because of Failed to resolve endpoint: activemq://queue:my_database?brokerURL=tcp%3A%2F%2F172.17.0.8%3A61616 due to: java.lang.IllegalArgumentException: wrong number of arguments

Я уже нашел этот пост с похожей проблемой : https://issues.apache.org/jira/browse/CAMEL-13145 но я не могу понять, что мне делать.

1 Ответ

0 голосов
/ 13 февраля 2020

Я смог решить это сам:

Как указано в посте https://issues.apache.org/jira/browse/CAMEL-13145, и что я не понял в первую очередь, зависимость -d mvn:org.apache.activemq:activemq-camel:5.15.11 вызывает проблема. Проблема ActiveMQ исчезнет, ​​если я запусту Camel-K следующим образом:

kamel run kamel-integrations/ActiveMQToInfluxDbRoute.java -d mvn:org.apache.commons:commons-lang3:3.9 -d mvn:org.influxdb:influxdb-java:2.17 -d mvn:org.apache.camel:camel-activemq:3.0.1 -d mvn:org.apache.camel:camel-core:2.25.0
...