как выполнить параллельную обработку сообщений gcp pubsub в apache верблюде - PullRequest
1 голос
/ 29 апреля 2020

У меня есть этот код ниже, который принимает сообщение из источника pubsub topi c -> преобразует его согласно шаблону -> затем публикует sh преобразованное сообщение в целевой topi c.

Но для повышения производительности мне нужно выполнить эту задачу параллельно. То есть мне нужно опросить 500 сообщений, а затем преобразовать их параллельно и затем опубликовать sh их в целевой топи c.

Из документация по компоненту верблюда gcp, я полагаю, что maxMessagesPerPoll и параметр concurrentConsumers сделают эту работу. Из-за отсутствия документации я не уверен, как она работает внутри.

Я имею в виду он создает параллельный маршрут 500, который будет обрабатывать сообщения и публиковать sh его в целевой топи c б) как насчет упорядочения сообщений c) я должен рассматривать параллельную обработку EIP как альтернативу

et c.

Концепция мне не ясна

Был go

// my route
private void addRouteToContext(final PubSub pubSub) throws Exception {

    this.camelContext.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {

            errorHandler(deadLetterChannel("google-pubsub:{{gcp_project_id}}:{{pubsub.dead.letter.topic}}")
                    .useOriginalMessage().onPrepareFailure(new FailureProcessor()));




            /*
             * from topic
             */
            from("google-pubsub:{{gcp_project_id}}:" + pubSub.getFromSubscription() + "?"
                    + "maxMessagesPerPoll={{consumer.maxMessagesPerPoll}}&"
                    + "concurrentConsumers={{consumer.concurrentConsumers}}").
            /*
             * transform using the velocity
             */
            to("velocity:" + pubSub.getToTemplate() + "?contentCache=true").
            /*
             * attach header to the transform message
             */
            setHeader("Header ", simple("${date:now:yyyyMMdd}")).routeId(pubSub.getRouteId()).
            /*
             * log the transformed event
             */
            log("${body}").
            /*
             * publish the transformed event to the target topic
             */
            to("google-pubsub:{{gcp_project_id}}:" + pubSub.getToTopic());
        }
    });
}

Ответы [ 2 ]

0 голосов
/ 30 апреля 2020

a) если я опрошу, скажем, 500 сообщений, то он создаст 500 параллельных маршрутов, которые обработают сообщения и опубликуют sh его до цели topi c

Нет В этом случае Camel не создает 500 параллельных потоков. Как вы подозреваете, число одновременных потоков потребителей устанавливается с concurrentConsumers. Таким образом, если вы определите 5 concurrentConsumers с maxMessagesPerPoll, равным 500, каждый потребитель получит до 500 сообщений и обработает их одно за другим в одном потоке. То есть у вас параллельно обрабатывается 5 сообщений.

как насчет порядка сообщений

Как только вы обрабатываете сообщения параллельно, порядок сообщений нарушается вверх. Но это уже случается с 1 потребителем, когда вы получили ошибки обработки, и они обращаются к вашему deadLetterChannel и обработаны позже.

, если я рассматриваю EIP параллельной обработки в качестве альтернативы

Только если опции concurrentConsumers недостаточно.

0 голосов
/ 30 апреля 2020

Когда вы упоминаете опцию concurrentConsumers (скажем, concurrentConsumers=10), вы просите Camel создать пул потоков из 10 потоков, и каждый из этих 10 потоков будет получать свое сообщение из очереди pub-sub. и обработать их.

Здесь следует отметить, что при указании параметра concurrentConsumers пул потоков использует фиксированный размер, что означает, что фиксированное число активных потоков все время ожидает обработки входящих Сообщения. Таким образом, 10 потоков (так как я указал concurrentConsumers = 10) будут ожидать обработки моих сообщений, даже если не поступает одновременно 10 сообщений.

Очевидно, это не гарантирует, что входящие сообщения будут обрабатываться в том же порядке. Если вы хотите, чтобы сообщения были в одном и том же порядке, вы можете взглянуть на Resequencer EIP , чтобы упорядочить ваши сообщения.

Что касается вашего третьего вопроса, я не думаю, что Компонент google-pubsub позволяет использовать параллельную обработку. Вы можете сделать свой собственный, используя Threads EIP . Это определенно даст больший контроль над вашим параллелизмом.

Используя потоки, ваш код будет выглядеть примерно так:

from("google-pubsub:project-id:destinationName?maxMessagesPerPoll=20")
// the 2 parameters are 'pool size' and 'max pool size'
.threads(5, 20)
.to("direct:out");
...