Как отфильтровать ненужные записи перед материализацией GlobalKTable? - PullRequest
0 голосов
/ 07 июня 2018

С Kafka Stream я всегда инициализирую свой магазин из ссылочных компактных тем, используя этот код:

builder.globalTable(kafkaTopic, Materialized.as("storeMerchant"));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

Я бы хотел отфильтровать тему kafkaTopic перед созданием магазина, чтобы устранить некоторые ненужныекупцы.

Примерно так:

GlobalKTable<String, MerchantAvro> merchant$ = builder.globalTable(kafkaTopic);
merchant$.filter((key, value) -> !Optional.ofNullable(value)
         .map(MerchantAvro::getDeletionDate)
         .isPresent());
...

Но невозможно применить filter метод к GlobalKTable.

Как я могу выполнить эту фильтрацию?

Ответы [ 2 ]

0 голосов
/ 14 июня 2018

Я создал «Streamer», который превращает человека в клиента следующим образом: Это топология:

    journal.info("Open topic {}...", kafkaTopic);
    StreamsBuilder builder = new StreamsBuilder();
    Topology topology = builder.build();
    topology.addSource("person$", kafkaTopic)
            .addProcessor("selection", PersonProcessor::new, "person$")
            .addSink("customer$", customerTopic, "selection");
    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
    Runtime.getRuntime()
            .addShutdownHook(new Thread(streams::close));

Это процессор:

public class PersonProcessor extends AbstractProcessor<String, PersonAvro> {
    Logger journal = LoggerFactory.getLogger(PersonProcessor.class);

    @Override
    public void process(String key, PersonAvro avroPerson) {
        journal.debug("traitement objet: {}, {}", key, avroPerson.getActive());
        Optional.ofNullable(avroPerson)
                .filter(person -> Optional.ofNullable(person)
                        .map(PersonAvro::getActive)
                        .filter(activation -> !activation.matches("0"))
                        .isPresent())
                .map(person -> CustomerAvro.newBuilder()
                        .setId(person.getId())
                        .setCompName(person.getCompName())
                        .setSiretCode(person.getSiretCode())
                        .setActive(person.getActive())
                        .setAdd3(person.getAdd3())
                        .setAdd4(person.getAdd4())
                        .setAdd5(person.getAdd5())
                        .setAdd6(person.getAdd6())
                        .setAdd7(person.getAdd7()))
                .map(CustomerAvro.Builder::build)
                .ifPresent(customer -> {
                    context().forward(key, customer);
                    context().commit();
                });
    }
}

И еще один стример, загружающийЛокальное хранилище формирует GlobalKTable

@PostConstruct
private void init() throws InterruptedException {
    configurer();
    journal.info("Open topic {}...", kafkaTopic);
    StreamsBuilder builder = new StreamsBuilder();
    builder.globalTable(kafkaTopic, Materialized.as("customerStore"));
    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
    customerStore = waitUntilStoreIsQueryable("customerStore", streams);
    Runtime.getRuntime()
            .addShutdownHook(new Thread(streams::close));
}

И это может отвечать на запросы синхронизации:

public Optional<CustomerDto> getClient(int idCoclico) {
    journal.debug(Markers.append("idCoclico", idCoclico), "Recherche d'un client COCLICO");
    // Recherche du client dans le cache
    Optional<CustomerDto> optClient = Optional.ofNullable(idCoclico)
            .map(String::valueOf)
            .map(customerStore::get)
            .map(avroCustomer -> {
                journal.debug(Markers.append("idCoclico", idCoclico),
                        "Le client existe dans le store local et n'est pas inactif");
                CustomerDto client = new CustomerDto(avroCustomer.getId());
                client.setCompName(avroCustomer.getCompName());
                client.setSiretCode(avroCustomer.getSiretCode());
                client.setAdd3(avroCustomer.getAdd3());
                client.setAdd4(avroCustomer.getAdd4());
                client.setAdd5(avroCustomer.getAdd5());
                client.setAdd6(avroCustomer.getAdd6());
                client.setAdd7(avroCustomer.getAdd7());
                Optional<String> optAdd = Optional.ofNullable(avroCustomer.getAdd7())
                        .map(String::trim)
                        .filter(add -> !add.isEmpty());
                // Si l'adresse est renseignée dans COCLICO
                if (optAdd.isPresent())
                    client.setCountryCode(avroCustomer.getCountryCode());
                // Les adresses Françaises ne sont pas renseignée
                else
                    client.setCountryCode(fr.laposte.bscc.encaissement.Constantes.CODE_PAYS_FRANCE);
                return client;
            });
    if (!optClient.isPresent())
        journal.info(Markers.append("idCoclico", idCoclico), "Le client n'existe pas dans le store local");
    return optClient;
}

Первые тесты или ок.Я попытаюсь развернуть это в среде сборки ...

0 голосов
/ 08 июня 2018

Сначала вам нужно будет отфильтровать тему и поместить результат в другую тему.Затем вы можете использовать вторую тему как GlobalKTable.

В качестве альтернативы вы можете использовать «глобальное хранилище» вместо GlobalKTable.Для этого случая вы можете предоставить пользовательский Processor, который может реализовать фильтр перед заполнением глобального хранилища.См. Определение потокового процессора .

Глобальные хранилища также являются локальными.Разница заключается в том, что для «обычного хранилища» данные разделены, то есть каждое хранилище содержит разные данные, в то время как для глобальных хранилищ каждый экземпляр загружает все данные (т. Е. Данные реплицируются).Таким образом, каждый член группы имеет собственную копию данных глобального хранилища.

...