Я не получаю такую большую пропускную способность, как мне кажется, с моим приложением this Streams, которое использует хранилище состояний, и мое потребительское отставание увеличивается.Мне интересно, есть ли какие-нибудь очевидные конфиги, которые я мог бы настроить, или что-нибудь, что могло бы помочь оптимизировать мою пропускную способность.
В моем основном случае у меня есть это для свойств:
public static void main(String[] args) {
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "reading-stream");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800");
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 0);
final MyStream stream = new MyStream(config);
stream.start();
Runtime.getRuntime().addShutdownHook(new Thread(stream::close));
}
В MyStream.start () - это код:
final Serde readingSerde = Utils.createSerde(Reading.class);
final Serde groupSerde = Utils.createSerde(Group.class);
final Serde checkStateSerde = Utils.createSerde(CheckState.class);
final StreamsBuilder builder = new StreamsBuilder();
final StoreBuilder storeSupplier =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_STATE),
Serdes.String(),
checkStateSerde
).withLoggingEnabled(new HashMap());
builder.addStateStore(storeSupplier);
final KTable<String, Group> group = builder
.table("group-topic",
Consumed.with(Serdes.String(), groupSerde)
);
final KStream<String, Reading> readings = builder
.stream("reading-topic",
Consumed.with(Serdes.String(), readingSerde)
.withOffsetResetPolicy(Topology.AutoOffsetReset.LATEST));
readings
.join(group,
....
).flatMap
....
}).process(() -> new MyProcessor(), STORE_STATE);
final Topology topology = builder.build();
streams = new KafkaStreams(topology, this.config);
streams.start();
Также рассматривается использование другого типа жесткого диска в AWS (сейчас используется gp2) ...
Любые другие идеи, которые помогут оптимизировать этокод