Я создал «Streamer», который превращает человека в клиента следующим образом: Это топология:
journal.info("Open topic {}...", kafkaTopic);
StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
topology.addSource("person$", kafkaTopic)
.addProcessor("selection", PersonProcessor::new, "person$")
.addSink("customer$", customerTopic, "selection");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Runtime.getRuntime()
.addShutdownHook(new Thread(streams::close));
Это процессор:
public class PersonProcessor extends AbstractProcessor<String, PersonAvro> {
Logger journal = LoggerFactory.getLogger(PersonProcessor.class);
@Override
public void process(String key, PersonAvro avroPerson) {
journal.debug("traitement objet: {}, {}", key, avroPerson.getActive());
Optional.ofNullable(avroPerson)
.filter(person -> Optional.ofNullable(person)
.map(PersonAvro::getActive)
.filter(activation -> !activation.matches("0"))
.isPresent())
.map(person -> CustomerAvro.newBuilder()
.setId(person.getId())
.setCompName(person.getCompName())
.setSiretCode(person.getSiretCode())
.setActive(person.getActive())
.setAdd3(person.getAdd3())
.setAdd4(person.getAdd4())
.setAdd5(person.getAdd5())
.setAdd6(person.getAdd6())
.setAdd7(person.getAdd7()))
.map(CustomerAvro.Builder::build)
.ifPresent(customer -> {
context().forward(key, customer);
context().commit();
});
}
}
И еще один стример, загружающийЛокальное хранилище формирует GlobalKTable
@PostConstruct
private void init() throws InterruptedException {
configurer();
journal.info("Open topic {}...", kafkaTopic);
StreamsBuilder builder = new StreamsBuilder();
builder.globalTable(kafkaTopic, Materialized.as("customerStore"));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
customerStore = waitUntilStoreIsQueryable("customerStore", streams);
Runtime.getRuntime()
.addShutdownHook(new Thread(streams::close));
}
И это может отвечать на запросы синхронизации:
public Optional<CustomerDto> getClient(int idCoclico) {
journal.debug(Markers.append("idCoclico", idCoclico), "Recherche d'un client COCLICO");
// Recherche du client dans le cache
Optional<CustomerDto> optClient = Optional.ofNullable(idCoclico)
.map(String::valueOf)
.map(customerStore::get)
.map(avroCustomer -> {
journal.debug(Markers.append("idCoclico", idCoclico),
"Le client existe dans le store local et n'est pas inactif");
CustomerDto client = new CustomerDto(avroCustomer.getId());
client.setCompName(avroCustomer.getCompName());
client.setSiretCode(avroCustomer.getSiretCode());
client.setAdd3(avroCustomer.getAdd3());
client.setAdd4(avroCustomer.getAdd4());
client.setAdd5(avroCustomer.getAdd5());
client.setAdd6(avroCustomer.getAdd6());
client.setAdd7(avroCustomer.getAdd7());
Optional<String> optAdd = Optional.ofNullable(avroCustomer.getAdd7())
.map(String::trim)
.filter(add -> !add.isEmpty());
// Si l'adresse est renseignée dans COCLICO
if (optAdd.isPresent())
client.setCountryCode(avroCustomer.getCountryCode());
// Les adresses Françaises ne sont pas renseignée
else
client.setCountryCode(fr.laposte.bscc.encaissement.Constantes.CODE_PAYS_FRANCE);
return client;
});
if (!optClient.isPresent())
journal.info(Markers.append("idCoclico", idCoclico), "Le client n'existe pas dans le store local");
return optClient;
}
Первые тесты или ок.Я попытаюсь развернуть это в среде сборки ...