В приложении Kafka Streams есть ли способ определить топологию с помощью группового списка выходных тем? - PullRequest
0 голосов
/ 24 июня 2019

У меня есть приложение Kafka Streams с несколькими схемами, которое обогащает запись через соединение с KTable, а затем передает обогащенную запись.

Формат имен входных тем в настоящее время четко определен, но я заменяю его на шаблон. Я хочу определить тему ввода каждой записи, вывести тему вывода с помощью замены регулярных выражений и отправить ее дальше.

например. При прослушивании event.raw.* запись поступает на event.raw.foo, и я хочу передать ее на event.foo.

Я понимаю, что могу получить входные темы через Processor API:

public class EnrichmentProcessor extends AbstractProcessor<String, GenericRecord> {

    @Override
    public void process(String key, GenericRecord value) {
        //Do Join...

        //Determine output topic and forward
        String outputTopic = context().topic().replaceFirst(".raw.", ".");
        context().forward(key, value, To.child(outputTopic));
        context().commit();
    }
}

Но это не помогает мне, когда я пытаюсь определить свою топологию, потому что у меня нет возможности узнать заранее, какой будет моя выходная тема.

  InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder();
        topologyBuilder.addSource("SOURCE", stringDeserializer, genericRecordDeserializer, "event.raw.*")
        .addProcessor("ENRICHER", EnrichmentProcessor::new, "SOURCE")
        .addSink("OUTPUT", outputTopic, stringSerializer, genericRecordSerializer, "ENRICHER"); // How can I register all possible output topics here?

Кто-нибудь разрешал подобную ситуацию раньше?

Я знаю, что если бы у меня был список возможных имен выходных тем, у меня могло бы быть несколько приемников, определенных в топологии, но я не собираюсь.

Есть ли способ определить топологию для динамически назначаемых имен выходных тем, если у меня нет заранее заданного списка возможных имен выходных тем?

1 Ответ

0 голосов
/ 25 июня 2019

Это должно быть возможно: вы можете использовать Topology#addSink(..., new TopicNameExtractor(){...}, ...) для динамического задания имени выходной темы. TopicNameExtractor имеет доступ к RecordContext, который позволяет вам получить имя входной темы через context.topic(). Следовательно, вы должны быть в состоянии вычислить имя выходной темы, основываясь на имени входной темы.

...