Перераспределение потока в топологии DSL с помощью selectKey и transform - PullRequest
0 голосов
/ 14 февраля 2019

Я чувствую, что, возможно, мне не хватает чего-то очень простого, но я все равно спрошу.

Существует тема ввода с несколькими разделами.Я использую selectKey как часть топологии DSL.SelectKey всегда возвращает одно и то же значение.Я ожидаю, что после внутреннего переразделения, вызванного selectKey (), следующий процессор в топологии будет вызываться в том же разделе для того же ключа.Однако следующий процессор transform () вызывается в разных разделах для одного и того же ключа.

Топология:

    Topology buildTopology() {
        final StreamsBuilder builder = new StreamsBuilder();


        builder
            .stream("in-topic", Consumed.with(Serdes.String(), new JsonSerde<>(CatalogEvent.class)))
            .selectKey((k,v) -> "key")
            .transform(() -> new Processor())
            .print();

        return builder.build();
    }

Класс процессора, используемый преобразованием

public class Processor implements Transformer<String, CatalogEvent, KeyValue<String, DispEvent>> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public KeyValue<String, DispEvent> transform(String key, CatalogEvent catalogEvent) {
        System.out.println("key:" + key + " partition:" + context.partition());
        return null;
    }

    @Override
    public KeyValue<String, DispatcherEvent> punctuate(long timestamp) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }
}

"in-topic" имеет два сообщения со случайными UUID в качестве ключей, т.е. "8f45e552-8886-4781-bb0c-79ca98f9d927", "a794ed2a-6f7d-4522-a7ac-27c51a64fa28", полезная нагрузка для обоих сообщений одинакова

Выходные данные Processor :: transform для двух UUID:

key:key partition: 2
key:key partition: 0

Как изменить топологию, чтобы сообщения с одинаковым ключом поступали в один и тот же раздел - мне нужно, чтобы убедиться, чточто сообщения с одним и тем же ключом будут отправляться в один и тот же локальный экземпляр магазина Kafka (для вставки или обновления).

1 Ответ

0 голосов
/ 14 февраля 2019

Для process(), transform() и transformValues() автоматическое переназначение отсутствует.Вам нужно будет вставить ручной through() вызов для перераспределения данных.Если вы сравните JavaDocs (с groupBy() или join(), которые поддерживают автоматическое перераспределение), вы увидите, что автоматическое перераспределение для них не упоминается.

Причина в том, что эти три метода являются частью ProcessorИнтеграция API в DSL, и, следовательно, нет операторов DSL.Их семантика неизвестна, и поэтому мы не можем сказать, требуют ли они перераспределения, был ли ключ изменен или нет.Чтобы избежать ненужного перераспределения, автоматическое перераспределение не выполняется.

Также существует соответствующий Jira: https://issues.apache.org/jira/browse/KAFKA-7608

...