Я чувствую, что, возможно, мне не хватает чего-то очень простого, но я все равно спрошу.
Существует тема ввода с несколькими разделами.Я использую selectKey как часть топологии DSL.SelectKey всегда возвращает одно и то же значение.Я ожидаю, что после внутреннего переразделения, вызванного selectKey (), следующий процессор в топологии будет вызываться в том же разделе для того же ключа.Однако следующий процессор transform () вызывается в разных разделах для одного и того же ключа.
Топология:
Topology buildTopology() {
final StreamsBuilder builder = new StreamsBuilder();
builder
.stream("in-topic", Consumed.with(Serdes.String(), new JsonSerde<>(CatalogEvent.class)))
.selectKey((k,v) -> "key")
.transform(() -> new Processor())
.print();
return builder.build();
}
Класс процессора, используемый преобразованием
public class Processor implements Transformer<String, CatalogEvent, KeyValue<String, DispEvent>> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, DispEvent> transform(String key, CatalogEvent catalogEvent) {
System.out.println("key:" + key + " partition:" + context.partition());
return null;
}
@Override
public KeyValue<String, DispatcherEvent> punctuate(long timestamp) {
// TODO Auto-generated method stub
return null;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
"in-topic" имеет два сообщения со случайными UUID в качестве ключей, т.е. "8f45e552-8886-4781-bb0c-79ca98f9d927", "a794ed2a-6f7d-4522-a7ac-27c51a64fa28", полезная нагрузка для обоих сообщений одинакова
Выходные данные Processor :: transform для двух UUID:
key:key partition: 2
key:key partition: 0
Как изменить топологию, чтобы сообщения с одинаковым ключом поступали в один и тот же раздел - мне нужно, чтобы убедиться, чточто сообщения с одним и тем же ключом будут отправляться в один и тот же локальный экземпляр магазина Kafka (для вставки или обновления).