Запуск нескольких топологий в одном экземпляре - PullRequest
1 голос
/ 28 мая 2019

Я хочу запустить 2 топологии на одном экземпляре.1 топология включает хранилище состояний, а другая - глобальное хранилище.Как мне сделать это успешно?

Я создал 1 тему с 3 разделами, а затем добавил хранилище состояний в 1 топологию и глобальное хранилище во 2 топологии.

Топология 1:

    public void createTopology() {
    Topology topology = new Topology();

    topology.addSource("source", new KeyDeserializer(), new ValueDeserializer(), "topic1");
    topology.addProcessor("processor1", new CustomProcessorSupplier1(), "source");

    final KeyValueStoreBuilder<Bytes, byte[]> rStoreBuilder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier("rstore"), Serdes.Bytes(), Serdes.ByteArray(), Time.SYSTEM);
    rStoreBuilder.withLoggingEnabled(new HashMap<>());

    topology.addStateStore(rStoreBuilder, "processor1");

    Properties p = new Properties();
    p.put(APPLICATION_ID_CONFIG, "stream1");
    p.put(BOOTSTRAP_SERVERS_CONFIG, KafkaUtil.getBootStrapServers());
    p.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, KeySerde.class);
    p.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueSerde.class);
    streams = new KafkaStreams(topology, p);
    streams.start();
}

Топология 2:

public void createTopology() {
    Topology topology = new Topology();

    final KeyValueStoreBuilder<Bytes, byte[]> rStoreBuilder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier("rstoreg"), Serdes.Bytes(), Serdes.ByteArray(), Time.SYSTEM);
    rStoreBuilder.withLoggingDisabled();

    topology.addGlobalStore(rStoreBuilder, "globalprocessname", Serdes.Bytes().deserializer(), Serdes.ByteArray().deserializer(), "topic1", "processor2", new CustomProcessorSupplier1());

    Properties p = new Properties();
    p.put(APPLICATION_ID_CONFIG, "stream1");
    p.put(BOOTSTRAP_SERVERS_CONFIG, KafkaUtil.getBootStrapServers());
    p.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, KeySerde.class);
    p.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueSerde.class);
    p.put(STATE_DIR_CONFIG, "/tmp/" + System.getProperty("server.port"));
    streams = new KafkaStreams(topology, p);
    streams.start();
}
}

При запуске одного экземпляра: -

Ожидается: и хранилище состояний, и глобальное хранилище должны содержать все ключи (данные из всех входных разделов топика1

Факт: хранилище состояний содержит данные из 2 разделов. Глобальное хранилище содержит данные из 1 раздела

При запуске 2 экземпляров этого кода: -

Ожидается: оба глобальных хранилища должны содержать все данные. 3 раздела разделены между 2 хранилищами состояний и содержат частичные данные

Actual: (S означает хранилище состояний, G означает глобальное хранилище, P означает разделениевходных данных) S1 - P1 G1 - P2 S2 - P3 G2 - P1, P2, P3

1 Ответ

2 голосов
/ 28 мая 2019

Проблема с StreamsConfig.APPLICATION_ID_CONFIG. Вы используете один и тот же для двух разных типов приложений.

Значение StreamsConfig.APPLICATION_ID_CONFIG используется как group.id. group.id используется для масштабирования приложения. Если у вас есть два экземпляра одного и того же приложения (с одинаковым group.id), они начнут обрабатывать сообщения из подмножества разделов.

В вашем случае у вас есть два разных приложения, но они использовали одно и то же StreamsConfig.APPLICATION_ID_CONFIG. Для каждого из них назначается подмножество разделов (разделы App1: 2, раздел App2: 1), и они обрабатывают только подмножество всего сообщения. Это потребительский групповой механизм.

Подробнее о группе потребителей вы можете найти:

...