Высокая потребительская задержка в приложении Kafka Stream, которое использует постоянное хранилище значений ключей - PullRequest
0 голосов
/ 18 мая 2018

Я не получаю такую ​​большую пропускную способность, как мне кажется, с моим приложением this Streams, которое использует хранилище состояний, и мое потребительское отставание увеличивается.Мне интересно, есть ли какие-нибудь очевидные конфиги, которые я мог бы настроить, или что-нибудь, что могло бы помочь оптимизировать мою пропускную способность.

В моем основном случае у меня есть это для свойств:

public static void main(String[] args) {
    final Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "reading-stream");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800");
    config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 0);

    final MyStream stream = new MyStream(config);
    stream.start();

    Runtime.getRuntime().addShutdownHook(new Thread(stream::close));
}

В MyStream.start () - это код:

final Serde readingSerde = Utils.createSerde(Reading.class);
final Serde groupSerde = Utils.createSerde(Group.class);
final Serde checkStateSerde = Utils.createSerde(CheckState.class);
final StreamsBuilder builder = new StreamsBuilder();

final StoreBuilder storeSupplier =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore(STORE_STATE),
        Serdes.String(),
        checkStateSerde
    ).withLoggingEnabled(new HashMap());
builder.addStateStore(storeSupplier);

final KTable<String, Group> group = builder
    .table("group-topic",
        Consumed.with(Serdes.String(), groupSerde)
    );

final KStream<String, Reading> readings = builder
    .stream("reading-topic",
        Consumed.with(Serdes.String(), readingSerde)
            .withOffsetResetPolicy(Topology.AutoOffsetReset.LATEST));

readings
    .join(group,
        ....
    ).flatMap
        ....
        }).process(() -> new MyProcessor(), STORE_STATE);

final Topology topology = builder.build();
streams = new KafkaStreams(topology, this.config);
streams.start();

Также рассматривается использование другого типа жесткого диска в AWS (сейчас используется gp2) ...

Любые другие идеи, которые помогут оптимизировать этокод

...