Как и когда Kafka Consumer фиксирует смещения в Kafka с помощью EIP-фреймворков, таких как Apache Camel? Как мы можем обрабатывать повторные попытки асинхронно с camel-kakfa? - PullRequest
0 голосов
/ 27 декабря 2018

Пример кода построителя маршрутизации, как показано ниже:

    // For out of seq event state (reque)
    onException(OutOfSeqStateException.class)
    .logStackTrace(false).logExhaustedMessageHistory(false)
        .setHeader("eventSource", constant(EventConstants.BACKEND))
        .delay(30000)/*.method(DelayerBean.class , "computeDelayInMillis")*/.asyncDelayed().// delay should be asynchronously
        .setBody().header(EventConstants.BE_STATE_EVENT)// send original event
        .to("direct:requeue");// toendpoint: requeroute

    // For handling other exceptions
    onException(Exception.class)
    .log("EXCEPTION OCCURED.....   ->  \"${exception.message}\"")
        .setExchangePattern(ExchangePattern.InOnly)
        .bean(KafkaErrorHandlerBean.class, "handle")
        .handled(true);

    // Backend Events Route
    from(commonCamelConfig.getKafkaConsumerEndpoint())
     .routeId("BackendStateIncomingRoute")
            .id(routeId)
            .to("log:" + fqClassName + "?showAll=true&level=" + logLevel)
            .unmarshal(jdf)
            .bean(MandatoryFieldCheckerBean.class, "performNullCheck")
            // all context info must be present,if not, throw exception
            .bean(ValidateEventHandlerBean.class, "validateIncomingEvents")
            .choice()
            .when().simple("${in.header.isValidEvent} == true",Boolean.class)//enter if valid event(backendstate/backenddata)
                    .choice()
                        .when(header("BEStateEvent").isNotNull())
                            .bean(EventTransformer.class, "getBackendTransformedEvent")
                            .bean(PaymentsService.class, "processMessage")
                            .bean(TransitionalStateHandlerBean.class,"handle")
                            .bean(AMQPProducer.class, "sendEventToMQ")
                            .setExchangePattern(ExchangePattern.InOnly)
                        .otherwise()
                            .bean(EventTransformer.class, "getBackendTransformedEvent")
                            .bean(PaymentsService.class, "processMessage")
                    .endChoice()
            .setExchangePattern(ExchangePattern.InOnly)//acknowledge only valid events, doesnt expect a reply
            .endChoice()
            .end();

    //Reque the original event in case of Retryable Exceptions
    from("direct:requeue").routeId("BackendDirectRequeRoute")
    .bean(RequestRetryHandlerBean.class, "doRetry")
    .to(commonCamelConfig.getKafkaConsumerEndpoint())
    .end();

Конфигурация Kafka для конечной точки потребителя выглядит следующим образом:

    public String getKafkaConsumerEndpoint() {
    return properties.getJmsKafkaBroker()
            + ":" + properties.getKafkaPaymentsOtpTopic()
            + "?brokers="+ properties.getBootstrapServers()
            + "&groupId="+ properties.getGroupId()
            + "&autoOffsetReset="+ properties.getAutoOffsetReset()
            + "&autoCommitEnable=true"
            + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
            + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer";
}

Есть два запроса, которые я имею в отношенииприведенный выше код:

  1. В какой момент всей маршрутизации потребитель kafka фиксирует смещения или является независимой задачей, поскольку я не изменил значение по умолчанию для поля autoCommitIntervalMs , что составляет 5 секунд, поэтому это означает, что он будет фиксироваться независимо каждые 5 секунд.

  2. Я хочу обработать сценарий повторного вызова в случае исключения из непоследовательности и способа, которым яЯ делаю это путем создания другой конечной точки с логикой шаблона задержки, которая будет отправлять ошибочное сообщение в DLQ по истечении максимального времени запроса.Какие лазейки в приведенной выше логике и есть ли лучший способ справиться с тем же?Если эта логика кажется хорошей, пожалуйста, обратите внимание, что я использовал asyncdelayed () для асинхронной задержки, но, похоже, она не работает и блокирует новые сообщения до тех пор, пока они не потребуются.Пожалуйста, помогите мне реализовать асинхронную задержку.

1 Ответ

0 голосов
/ 02 января 2019

1) Если вы используете верблюжий вариант <2.22, то вы не можете управлять смещением, это происходит в другом потоке со значением по умолчанию 5 секунд, которое можно изменить.Если вы используете верблюжий вариант> = 2.22, то только вы можете контролировать фиксацию сообщений вручную.Чтобы использовать ручную фиксацию, установите следующие свойства:

autoCommitEnable = false: отключить автоматическую фиксацию смещений, чтобы мы могли использовать ручную фиксацию.allowManualCommit = true: включить фиксацию вручную, дает нам доступ к возможности KafkaManualCommit.Ниже приведен фрагмент кода:

KafkaManualCommit manual =
                        exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
                if (manual != null) {
                    LOGGER.info("committing the offset manually");
manual.commitSync();
            }

2) В вашем втором вопросе кажется, что вы хотите снова отправить сообщение на kafka для обработки.Но из вашего кода кажется, что вы используете одну и ту же конечную точку для потребителя и производителя.Когда вы хотите создать сообщение в кафке, вам нужно указать «тему», «раздел» и «ключ» сообщения, которого я не вижу в вашем коде.Говоря о дырах в петлях, потому что вы снова помещаете сообщения в kafka, что, если сообщение было повреждено, поэтому вы будете продолжать получать то же исключение и снова помещать то же сообщение n снова обратно в kafka.Я бы предложил повторить сообщение по тому же маршруту.Ниже приведен фрагмент кода:

onException(YourException.class)
                .maximumRedeliveries(3) // You can call some method too
                .redeliveryDelay(100) // You can call some method too
                .onRedelivery(exchange -> {
                    int retryCount = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
                    log.debug("Recoverable exception occurred. Retried {} time " , retryCount);
                })
                .retryAttemptedLogLevel(LoggingLevel.DEBUG)
                .to("someOtherRoute // Probably to error-topic
...