Делая топологию карты / сгруппировать / считать, магазин пуст - PullRequest
0 голосов
/ 13 июня 2018

Я не понимаю, почему хранилище statStore пусто в этой топологии:

    StreamsBuilder builder = new StreamsBuilder();
    KTable<String, PersonAvro> person$ = builder.table("sc_loc_person_priv_v4");
    person$.mapValues(person -> CustomerAvro.newBuilder()
            .setId(person.getId())
            .setCompName(person.getCompName())
            .setSiretCode(person.getSiretCode())
            .setActive(person.getActive())
            .setAdd3(person.getAdd3())
            .setAdd4(person.getAdd4())
            .setAdd5(person.getAdd5())
            .setAdd6(person.getAdd6())
            .setAdd7(person.getAdd7()))
            .mapValues(CustomerAvro.Builder::build)
            .toStream()
            .peek((key, personne) -> {
                journal.debug("{}, {}", key, personne);
            })
            .through("sc_loc_customer_priv_v1")
            .groupBy((String key, CustomerAvro customer) -> customer.getActive())
            .count(Materialized.as("statStore"));
    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
    store = waitUntilStoreIsQueryable("statStore", streams);

Естественно, я помещаю сообщения в тему sc_loc_person_priv_v4.

Если я просто делаю эту топологию,магазин не пустой:

@PostConstruct
private void init() throws InterruptedException {
    configurer();
    producer = new KafkaProducer<>(props);
    StreamsBuilder builder = new StreamsBuilder();
    builder.globalTable(kafkaTopic, Materialized.as("customerStore"));
    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
    store = waitUntilStoreIsQueryable("customerStore", streams);
}

public long getNb() {
    return store.approximateNumEntries();
}
...