EIP / Apache Camel - Как обрабатывать сообщения одновременно, но атомарно для каждой группы? - PullRequest
0 голосов
/ 14 декабря 2018

У меня следующая ситуация:

  • Существует фиксированное количество групп.
  • Существует поток входящих сообщений TCP.Каждое сообщение относится только к одной группе.

Я запускаю маршрут Camel следующим образом:

public class MyMessage implements Runnable {
    public void run() {
        // omitted here
    }
}

from("netty:tcp://localhost:7777?textline=true&sync=false")
   ... // omitted here: parse message to pojo MyMessage, set header "group-identifier"
   .to(seda:process);

Этот маршрут Camel использует поток TCP, анализирует и преобразует полезную нагрузку каждоговходящее сообщение для MyMessage pojo и устанавливает заголовок group-identifier на обмене, который соответствует сообщению ...

Теперь я хочу использовать seda:process следующим образом:

  • Сообщения, принадлежащие одной и той же группе, не могут быть выполнены одновременно.
  • Сообщения, принадлежащие разным группам, могут выполняться одновременно.
  • Каждое сообщение должно быть выполнено путем вызова run().Я хочу предоставить / определить ExecutorService для этого, чтобы я мог контролировать количество потоков.

Какие шаблоны интеграции предприятия я могу применить здесь?Как мне сопоставить эти понятия с верблюдом?

Я узнал, что ActiveMQ имеет концепцию групп сообщений (http://activemq.apache.org/message-groups.html).. Это может обеспечить способ гарантировать, что два сообщения одной группы никогда не будутвыполняется в то же время. Хотя я не уверен, что внедрение ActiveMQ только для этого не является излишним. Может ли это быть достигнуто с помощью «основного» Camel / Java?

1 Ответ

0 голосов
/ 15 декабря 2018

Это довольно легко сделать в ActiveMQ.Следующий фрагмент кода имитирует выполнение сообщений по мере необходимости:

  • Сообщения, принадлежащие к той же группе, выполняются последовательно.
  • Сообщения, принадлежащие разным группам, выполняются одновременно.

Это опирается на группы сообщений ActiveMQ, как объяснено на http://activemq.apache.org/message-groups.html.

final CamelContext context = new DefaultCamelContext();

context.addComponent("activemq", ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false"));
context.addRoutes(new RouteBuilder() {
    @Override
    public void configure() {
        from("activemq:queue:q?concurrentConsumers=5")
                .process(exchange -> {
                    System.out.println(Thread.currentThread() + " - " + exchange.getIn().getBody());
                    Thread.sleep(5000);
                });
    }
});
context.start();

for (int i = 0; i < 1000; ++i) {
    context.createFluentProducerTemplate()
            .withBody("This is a message from group : " + (i % 5))
            .withHeader("JMSXGroupID", "" + (i % 5))
            .to("activemq:queue:q")
            .send();
}

Тем не менее, мне (все еще) интересно, можно ли это сделать с помощью чистого EIP / Camel-core.

...