Динамическая маршрутизация / дублирование на несколько тем в Kafka 2 - PullRequest
0 голосов
/ 05 февраля 2019

Kafka 2 добавил поддержку динамической маршрутизации через интерфейс TopicNameExtractor, который поддерживает только одно значение темы.

Возможно, я собираюсь описать плохой дизайн, но на данном этапе мне просто интересно, что возможнов потоках Кафки.

Если предположить, что каждое сообщение содержит список тегов, существует ли способ дублировать сообщение на несколько тем на основе этого списка тегов?

Ответы [ 2 ]

0 голосов
/ 05 февраля 2019

Как сказал Матиас, вы должны дублировать сообщения.Это можно легко сделать, используя KStream::flatMapValues(ValueMapperWithKey ...)

Пример кода приведен ниже.Сообщение будет продублировано на основе tags: List<String>.

Модель:

public class Person {
    public String name;
    public List<String> tags;
    public transient String mainTag;
    public Person(String name, List<String> tags) {
        this.name = name;
        this.tags = tags;
    }

    public Person(String name, List<String> tags, String mainTag) {
        this.name = name;
        this.tags = tags;
        this.mainTag = mainTag;
    }
}

Приложение:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PersonSerdes.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();

KStream<String, Person> input = builder.stream("input");
input.flatMapValues(((readOnlyKey, person) ->
        person.tags
                .stream()
                .map(tag -> new Person(person.name, person.tags, tag))
                .collect(Collectors.toList()))
).to((key, person, recordContext) -> person.mainTag);
0 голосов
/ 05 февраля 2019

Это невозможно в данный момент.Однако для этого уже есть запрос функции: https://issues.apache.org/jira/browse/KAFKA-7578

На данный момент запись записи в несколько выходных тем возможна только в том случае, если запись дублирована и отправлена ​​в несколько приемников.

...