Сброс приложения Kafka Stream - PullRequest
0 голосов
/ 16 октября 2019

пытается понять, как использовать инструмент сброса для приложения kafka stream. Однако я смущен объяснением.

Говорят, что есть два этапа: локальный отдых (очистка), за которым следует глобальный сброс, использующий инструмент сброса.

Следующий пример представлен в слиянии:

https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html#step-2-reset-the-local-environments-of-your-application-instances


    package io.confluent.examples.streams;

import ...;

public class ResetDemo {

  public static void main(String[] args) throws Exception {
    // Kafka Streams configuration
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
    // ...and so on...

    // Define the processing topology
    StreamsBuilder builder = new StreamsBuilder();
    builder.stream("my-input-topic")
        .selectKey(...)
        .through("rekeyed-topic")
        .groupByKey()
        .count("global-count")
        .to("my-output-topic");

    KafkaStreams app = new KafkaStreams(builder.build(), props);

    // Delete the application's local state.
    // Note: In real application you'd call `cleanUp()` only under
    // certain conditions.  See tip on `cleanUp()` below.
    app.cleanUp();

    app.start();

    // Note: In real applications you would register a shutdown hook
    // that would trigger the call to `app.close()` rather than
    // using the sleep-then-close example we show here.
    Thread.sleep(30 * 1000L);
    app.close();
  }

}

, за которым следует следующий пример:

Совет. Во избежание соответствующих затрат на восстановление не следует вызывать cleanUp (безоговорочно и каждый раз, когда экземпляр приложения перезапускается или возобновляется. Например, в производственном приложении вы можете использовать аргументы командной строки для включения или отключения вызова cleanUp () по мере необходимости.

Затем можно выполнить циклы выполнения-сброса-изменения следующим образом:

    # Run your application
  bin/kafka-run-class io.confluent.examples.streams.ResetDemo

# After stopping all application instances, reset the application
  bin/kafka-streams-application-reset --application-id my-streams-app \
                                      --input-topics my-input-topic \
                                      --intermediate-topics rekeyed-topic

# Now you can modify/recompile as needed and then re-run the application again.
# You can also experiment, for example, with different input data without
# modifying the application.

Я не понимаю этого.

  1. Требует ли app.cleanUp () выполнения app.start ()?
  2. Почему после app.cleanUp () следует app.start (), в чемобстоятельства могут быть полезны?
  3. Почему бы просто не иметь флаг, который говорит, что это cleanUp, а затем просто запустить app.cleanUp (), и он не запустится app.start ();

Я просто не понимаю последнюю часть примера

Запустите ваше приложение и примените инструмент сброса. но если приложение выглядит как в примере, то после очистки приложение запустится и у нас будут локальные данные. Так что остановка не поможет остальным инструментам.

Это очень запутанно. Может ли кто-нибудь, кто понимает это, объяснить, пожалуйста?

1 Ответ

1 голос
/ 16 октября 2019

Запуск инструмента сброса приложения гарантирует, что состояние вашего приложения - как отслеживается глобально в настроенном кластере приложения Kafka - сбрасывается. Тем не менее, по своему замыслу инструмент сброса не изменяет и не сбрасывает локальную среду экземпляров приложения, которая включает в себя локальный каталог состояния приложения. Для полного сброса приложения необходимо также удалить локальный каталог состояния приложения на всех компьютерах, на которых был запущен экземпляр приложения, до перезапуска экземпляра приложения на тех же компьютерах. Вы можете либо использовать метод API KafkaStreams # cleanUp () в своем коде приложения, либо вручную удалить соответствующий каталог локальных состояний (state.dir параметр конфигурации).

app.cleanup() метод удаляет локальные хранилища состояний. app.startUp() запускает приложение KStream.

  1. Требует ли app.cleanUp () выполнения app.start ()?

app.cleanup() метод удаляет локальные хранилища состояний. app.startUp() требуется для запуска приложения KStream. Если вы хотите просто убрать магазины, вы можете позвонить только cleanUp(). Но если вы хотите запустить приложение, вам нужно также вызвать метод startUp().

Зачем иметь app.cleanUp (), а затем app.start (), при каких обстоятельствах это будет полезно?

Когда вы хотите сбросить приложение до самого раннегосмещение и запустить его с начала, нужно вызвать метод cleanUp(). Если вы хотите возобновить остановленное приложение из текущего состояния, не вызывайте cleanUp () в коде вашего приложения. Не рекомендуется использовать очистку по умолчанию как часть приложения, если в этом нет необходимости. Также вы не можете удалить хранилища в работающем приложении, поэтому всегда вызывайте cleanUp() перед startUp() методом.

Почему бы просто не иметь флаг, который говорит, что это cleanUp, тогда просто запустите app.cleanUp (), и он не запустится app.start ();

Да, выможет сделать это на основе вашего варианта использования. Определите флаг в файле конфигурации и добавьте условную очистку. Пример:

if(cleanUpFlag){
   app.cleanUp();
}
app.start();

Если вы хотите автоматизировать весь процесс для глобальной и локальной очистки, вы можете написать скрипт, который принимает файл конфигурации в качестве входных данных. Вы можете обновить значение флага flag=true в этом файле конфигурации как true. Также убедитесь, что переменная config используется в условии if.

if(properties.getProperty("cleanUp.flag") == true){
       app.cleanUp();
    }

Пример:

# After stopping all application instances, reset the application
$ bin/kafka-streams-application-reset --application-id my-streams-app \
                                      --input-topics my-input-topic \
                                      --intermediate-topics rekeyed-topic \
                                       --bootstrap-servers brokerHost:9092 \
                                      --zookeeper zookeeperHost:2181


# Run your application
$ java –DApp.config.file=app.properties –jar KStreamDemo.jar
...