У меня есть приложение Kafka Streams с несколькими схемами, которое обогащает запись через соединение с KTable, а затем передает обогащенную запись.
Формат имен входных тем в настоящее время четко определен, но я заменяю его на шаблон. Я хочу определить тему ввода каждой записи, вывести тему вывода с помощью замены регулярных выражений и отправить ее дальше.
например. При прослушивании event.raw.*
запись поступает на event.raw.foo
, и я хочу передать ее на event.foo
.
Я понимаю, что могу получить входные темы через Processor API:
public class EnrichmentProcessor extends AbstractProcessor<String, GenericRecord> {
@Override
public void process(String key, GenericRecord value) {
//Do Join...
//Determine output topic and forward
String outputTopic = context().topic().replaceFirst(".raw.", ".");
context().forward(key, value, To.child(outputTopic));
context().commit();
}
}
Но это не помогает мне, когда я пытаюсь определить свою топологию, потому что у меня нет возможности узнать заранее, какой будет моя выходная тема.
InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder();
topologyBuilder.addSource("SOURCE", stringDeserializer, genericRecordDeserializer, "event.raw.*")
.addProcessor("ENRICHER", EnrichmentProcessor::new, "SOURCE")
.addSink("OUTPUT", outputTopic, stringSerializer, genericRecordSerializer, "ENRICHER"); // How can I register all possible output topics here?
Кто-нибудь разрешал подобную ситуацию раньше?
Я знаю, что если бы у меня был список возможных имен выходных тем, у меня могло бы быть несколько приемников, определенных в топологии, но я не собираюсь.
Есть ли способ определить топологию для динамически назначаемых имен выходных тем, если у меня нет заранее заданного списка возможных имен выходных тем?