Кафка: обработка точно - нужна ли вам ваша потоковая программа для создания темы кафки? - PullRequest
1 голос
/ 16 октября 2019

У меня есть приложение kafka streams, использующее тему kafka. Он только потребляет и обрабатывает данные, но ничего не производит.

Для того, чтобы обработка Kafka в точности_once работала, вам также нужно ваше потоковое приложение для записи в тему kafka?

Как вы можете достичь ровно того времени, если ваше потоковое приложение хочет обработать сообщение только один раз, но ничего не генерирует?

Ответы [ 2 ]

1 голос
/ 16 октября 2019

Предоставление семантики обработки «точно один раз» действительно означает, что отдельные обновления состояния оператора, которым управляет механизм потоковой обработки, отражаются только один раз. «Ровно один раз» ни в коем случае не гарантирует, что обработка события, то есть выполнение произвольной пользовательской логики, произойдет только один раз.

Выше приведено объяснение семантики «Ровно один раз». .

Нет необходимости публиковать вывод в теме всегда в приложении KStream.

Когда вы используете приложения KStream, вы должны определить applicationID для каждого, который использует потребителя вбэкенд. В приложении вам нужно настроить несколько параметров, таких как processing.guarantee - exactly_once и enable.idempotence

Вот подробности:
https://kafka.apache.org/22/documentation/streams/developer-guide/config-streams#processing-guarantee

1 голос
/ 16 октября 2019

Я не конфликтую с шаблоном потока "Точно один раз", потому что это красота Kafka Stream, однако можно использовать Kafka Stream, не создавая другие темы.

Шаблон потока "ровно один раз" - это просто возможность выполнитьоперация чтения-процесса-записи ровно один раз. Означает, что вы потребляете одно сообщение за раз, получаете процесс и публикуетесь в другой теме и фиксируете. Таким образом, commit будет обрабатываться Stream автоматически по одному сообщению за раз. Для этого kafka Stream устанавливает следующие параметры, которые нельзя перезаписать

  • изоляция. Уровень: (read_committed) - Потребители всегда будут читать только зафиксированные данные
  • enable.idempotence:(true) - у производителя всегда будет включена идемпотентность
  • max.in.flight.requests.per.connection "(5) - у производителя всегда будет один запрос в полете на соединение

В случае, если какая-либо ошибка в потоке kafka потребителя или производителя всегда повторяет определенное сконфигурированное число попыток KafkaStream не гарантирует внутри логики обработки, которую мы все еще должны обработать, например, есть требование для работы с БД, и если соединение с БД не удалось вв этом случае Кафка не знает, поэтому вам нужно справиться с этим самостоятельно.

В соответствии с определением шаблона, да, нам нужна тема для потребителей, процессов и производителей, но в целом это не останавливает вас, если вы не выводите на другойТем не менее, вы можете потреблять ровно один элемент за один раз с фиксацией интервала времени по умолчанию (DEFAULT_COMMIT_INTERVAL_MS) и снова вам нужно самостоятельно обработать свой сбой логической транзакции. Я приведу несколько примеров.

StreamsBuilder builder = new StreamsBuilder();
        Properties props = getStreamProperties();
        KStream<String, String> textLines = builder.stream(Pattern.compile("topic"));
        textLines.process(() -> new ProcessInternal());

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("Completed VQM stream");
            streams.close();
        }));

        logger.info("Streaming start...");

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }



        class ProcessInternal implements Processor<String, String> {
        private ProcessorContext context;
        @Override
        public void init(ProcessorContext context) {
            this.context = context;

        }

        @Override
        public void close() {
            // Any code for clean up would go here. 
        }
        @Override
        public void process(String key, String value) {
        ///Your transactional process business logic

        }
    }
...