Возможна ли единственная доставка через spring-cloud-stream-binder-kafka или spring-kafka - PullRequest
0 голосов
/ 05 июля 2018

Я пытаюсь добиться единовременной доставки с использованием spring-cloud-stream-binder-kafka в приложении весенней загрузки Версии, которые я использую:

  • весна-облако поток-связующее-Кафка-ядро-1.2.1.RELEASE
  • пружинно-облака потока связующего вещества-Кафка-1.2.1.RELEASE
  • spring-cloud-stream-codec-1.2.2.RELEASE spring-kafka-1.1.6.RELEASE
  • весна-интегрально-Кафка-2.1.0.RELEASE
  • spring -gration-core-4.3.10.RELEASE
  • Zookeeper-3.4.8
  • Кафка версия: 0.10.1.1

Это моя конфигурация (cloud-config):

    spring:
      autoconfigure:
        exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
      kafka:
        consumer:
          enable-auto-commit: false
      cloud:
        stream:
          kafka:
            binder:
              brokers: "${BROKER_HOST:xyz-aws.local:9092}"
              headers:
                - X-B3-TraceId
                - X-B3-SpanId
                - X-B3-Sampled
                - X-B3-ParentSpanId
                - X-Span-Name
                - X-Process-Id
              zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
            bindings:
              feed_platform_events_input:
                consumer:
                  autoCommitOffset: false
          binders:
            xyzkafka:
              type: kafka
          bindings:
            feed_platform_events_input:
              binder: xyzkafka
              destination: platform-events
              group: br-platform-events

У меня есть два основных класса: Интерфейс FeedSink:

package au.com.xyz.proxy.interfaces;
import org.springframework.cloud.stream.annotation.Input; 
import org.springframework.messaging.MessageChannel;

public interface FeedSink {

String FEED_PLATFORM_EVENTS_INPUT = "feed_platform_events_input";

@Input(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
MessageChannel feedlatformEventsInput();
} 

EventConsumer

package au.com.xyz.proxy.consumer;

@Slf4j
@EnableBinding(FeedSink.class)
public class EventConsumer {

    public static final String SUCCESS_MESSAGE =
            "SEND-SUCCESS : Successfully sent message to platform.";
    public static final String FAULT_MESSAGE = "SOAP-FAULT Code: {}, Description: {}";
    public static final String CONNECT_ERROR_MESSAGE = "CONNECT-ERROR Error Details: {}";
    public static final String EMPTY_NOTIFICATION_ERROR_MESSAGE =
            "EMPTY-NOTIFICATION-ERROR Empty Event Received from platform";

    @Autowired
    private CapPointService service;

    @StreamListener(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
    /**
     * method associated with stream to process message.
     */
    public void message(final @Payload EventNotification eventNotification,
                        final @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {

        String caseMilestone = "UNKNOWN";
        if (!ObjectUtils.isEmpty(eventNotification)) {
            SysMessage sysMessage = processPayload(eventNotification);
            caseMilestone = sysMessage.getCaseMilestone();
            try {
                ClientResponse response = service.sendPayload(sysMessage);
                if (response.hasFault()) {
                    Fault faultDetails = response.getFaultDetails();
                    log.error(FAULT_MESSAGE, faultDetails.getCode(), faultDetails.getDescription());
                } else {
                    log.info(SUCCESS_MESSAGE);
                }
                acknowledgment.acknowledge();
            } catch (Exception e) {
                log.error(CONNECT_ERROR_MESSAGE, e.getMessage());
            }
        } else {
            log.error(EMPTY_NOTIFICATION_ERROR_MESSAGE);
            acknowledgment.acknowledge();
        }
    }



    private SysMessage processPayload(final EventNotification eventNotification) {
        Gson gson = new Gson();
        String jsonString =  gson.toJson(eventNotification.getData());
        log.info("Consumed message for platform events with payload : {} ", jsonString);
        SysMessage sysMessage = gson.fromJson(jsonString, SysMessage.class);
        return sysMessage;
    }
    }

Я установил свойство autocommit для Kafka и подпружиненного контейнера как false. если вы видите в классе EventConsumer, я использовал Acknowledge в тех случаях, когда I service.sendPayload успешен, и нет никаких исключений. И я хочу, чтобы контейнер перемещал смещение и опрашивал следующие записи. То, что я заметил, это:

  • Сценарий 1 - в случае, если выброшено исключение и на кафке не публикуются новые сообщения. Нет повторной попытки обработать сообщение, и кажется, что нет никакой активности. Даже если основная проблема решена. Проблема, о которой я говорю, - недоступность сервера нисходящего потока. Есть ли способ повторить обработку n раз, а затем сдаться. Обратите внимание, что это повторная попытка обработки или повторная загрузка из последнего зафиксированного смещения. Это не про Кафку, экземпляр недоступен. Если я перезапускаю службу (экземпляр EC2), то обработка происходит со смещения, где было выполнено последнее успешное подтверждение.

  • Сценарий 2 - В случае, когда возникла исключительная ситуация, а затем последующее сообщение отправляется на кафку. Я вижу, что новое сообщение обработано, а смещение перемещено. Это означает, что я потерял сообщение, которое не было подтверждено. Так что вопрос в том, справился ли я с Подтверждением. Как мне контролировать чтение из последнего коммита, а не только последнее сообщение и его обработку. Я предполагаю, что происходит внутренний опрос, и он не принял во внимание или не знал о том, что последнее сообщение не было подтверждено. Я не думаю, что есть несколько потоков, читающих из Кафки. Я не знаю, как контролируются аннотации @Input и @StreamListener. Я предполагаю, что поток управляется свойством consumer.concurrency, которое управляет потоком, и по умолчанию оно установлено в 1.

Итак, я провел исследование и нашел много ссылок, но, к сожалению, ни одна из них не отвечает на мои конкретные вопросы. Я посмотрел на (https://github.com/spring-cloud/spring-cloud-stream/issues/575) который имеет комментарий от Мариуса (https://stackoverflow.com/users/809122/marius-bogoevici):

Обратите внимание, что Kafka не обеспечивает индивидуальную проверку сообщений, которая означает, что подтверждение приводит к обновлению последней потребленной смещение к смещению подтвержденного сообщения (по теме / разделу). Тот означает, что если вы получаете сообщения из одной темы порядка, сообщение может «подтвердить» все сообщения перед ним.

не уверен, что это проблема с заказом, когда есть одна нить.

Извиняюсь за длинный пост, но я хотел предоставить достаточно информации. Главное, я пытаюсь избежать потери сообщений при использовании kafka и пытаюсь понять, может ли Spring-cloud-stream-binder-kafka справиться с этой задачей, или мне нужно искать альтернативы.

Обновление от 6 июля 2018 года

Я видел этот пост https://github.com/spring-projects/spring-kafka/issues/431 Это лучший подход к моей проблеме? Могу попробовать последнюю версию spring-kafka

@KafkaListener(id = "qux", topics = "annotated4", containerFactory = "kafkaManualAckListenerContainerFactory",
                containerGroup = "quxGroup")
public void listen4(@Payload String foo, Acknowledgment ack, Consumer<?, ?> consumer) {
  • Поможет ли это в управлении смещением, которое будет установлено на место последнего успешно обработанная запись? Как я могу сделать это из прослушивания метод. consumer.seekToEnd (); и как прослушать сброс метода для получения этой записи?
  • Помещает ли Потребитель подпись в поддержку обращаться с потребителем? Или мне нужно сделать что-нибудь еще?
  • Должен ли я использовать Acknowledge или consumer.commitSyncy ()
  • Какое значение контейнераFactory. я должен определить это как боб.
  • Нужны ли @EnableKafka и @Configuration для вышеуказанного подхода к работе? Учитывая, что приложение является приложением Spring Boot.
  • Добавляя Consumer для метода прослушивания, мне не нужно реализовывать ConsumerAware Interface?

И последнее, но не менее важное: возможно ли привести пример вышеприведенного подхода, если это возможно?

Обновление от 12 июля 2018 года

Спасибо Гэри (https://stackoverflow.com/users/1240763/gary-russell) за предоставленную подсказку по использованию maxAttempts. Я использовал этот подход. И я могу добиться доставки только один раз и сохранить порядок сообщения.

Моя обновленная облачная конфигурация:

    spring:
      autoconfigure:
        exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
      kafka:
        consumer:
          enable-auto-commit: false
      cloud:
        stream:
          kafka:
            binder:
              brokers: "${BROKER_HOST:xyz-aws.local:9092}"
              headers:
                - X-B3-TraceId
                - X-B3-SpanId
                - X-B3-Sampled
                - X-B3-ParentSpanId
                - X-Span-Name
                - X-Process-Id
              zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
            bindings:
              feed_platform_events_input:
                consumer:
                  autoCommitOffset: false
          binders:
            xyzkafka:
              type: kafka
          bindings:
            feed_platform_events_input:
              binder: xyzkafka
              destination: platform-events
              group: br-platform-events
              consumer:
                maxAttempts: 2147483647
                backOffInitialInterval: 1000
                backOffMaxInterval: 300000
                backOffMultiplier: 2.0

Event Consumer остается таким же, как моя первоначальная реализация. За исключением повторной выдачи ошибки для контейнера, чтобы узнать, что обработка не удалась. Если вы просто поймаете его, то контейнер не сможет узнать, что обработка сообщений имеет сбои. Выполняя Квитирование с подтверждением, вы просто контролируете смещение. Для повторной попытки вы должны сгенерировать исключение. Не забудьте установить свойство autocommit клиента kafka и свойство autocommitOffset Spring (уровень контейнера) в false. Вот и все.

1 Ответ

0 голосов
/ 05 июля 2018

Как объяснил Мариус, Кафка поддерживает только смещение в журнале. Если вы обработаете следующее сообщение и обновите смещение; потерянное сообщение потеряно.

Вы можете отправить сообщение об ошибке в тему недоставленных сообщений (установите для enableDlq значение true).

Последние версии Spring Kafka (2.1.x) имеют специальные обработчики ошибок ContainerStoppingErrorHandler, которые останавливают контейнер при возникновении исключения, и SeekToCurrentErrorHandler, что приводит к повторной доставке сообщения с ошибкой.

...