Я не понимаю, почему хранилище statStore пусто в этой топологии:
StreamsBuilder builder = new StreamsBuilder();
KTable<String, PersonAvro> person$ = builder.table("sc_loc_person_priv_v4");
person$.mapValues(person -> CustomerAvro.newBuilder()
.setId(person.getId())
.setCompName(person.getCompName())
.setSiretCode(person.getSiretCode())
.setActive(person.getActive())
.setAdd3(person.getAdd3())
.setAdd4(person.getAdd4())
.setAdd5(person.getAdd5())
.setAdd6(person.getAdd6())
.setAdd7(person.getAdd7()))
.mapValues(CustomerAvro.Builder::build)
.toStream()
.peek((key, personne) -> {
journal.debug("{}, {}", key, personne);
})
.through("sc_loc_customer_priv_v1")
.groupBy((String key, CustomerAvro customer) -> customer.getActive())
.count(Materialized.as("statStore"));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
store = waitUntilStoreIsQueryable("statStore", streams);
Естественно, я помещаю сообщения в тему sc_loc_person_priv_v4.
Если я просто делаю эту топологию,магазин не пустой:
@PostConstruct
private void init() throws InterruptedException {
configurer();
producer = new KafkaProducer<>(props);
StreamsBuilder builder = new StreamsBuilder();
builder.globalTable(kafkaTopic, Materialized.as("customerStore"));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
store = waitUntilStoreIsQueryable("customerStore", streams);
}
public long getNb() {
return store.approximateNumEntries();
}