Вы детерминировано отправляете записи в определенный раздел?Другими словами, вы знаете фактический раздел для каждого ключа?Если вы укажете только PartitionKeyExtractorStrategy
, то механизм связывания будет произвольно выбирать раздел для отправки этой записи.Если вы хотите сделать его детерминированным, вы можете предоставить partitionSelectorClass
как свойство (реализовать интерфейс PartitionSelectorStrategy
) на стороне вашего производителя.Этот интерфейс позволяет выбрать раздел на основе ключа.Допустим, вы хотите отправить все записи с ключом UUID-1 в раздел 1
, и вы кодировали это с помощью реализации PartitionSelectorStrategy
.Это означает, что ваш процессор потоков kafka знает, что записи с ключом UUID-1 поступают из раздела 1
.С этими допущениями вы можете сделать следующее в вашем процессоре потоков kafka.По сути, это вариант этого ответа для одного из ваших других вопросов.
@StreamListener("requesti")
@SendTo("responseo")
public KStream<UUID,Account> process(KStream<UUID, Account> events) {
return events.transform(() -> new Transformer<UUID, Account, KeyValue<UUID, Account>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<UUID,Account> transform(UUID key, Account value) {
if (this.context.partition() == 1) {
//your processing logic
return KeyValue.pair(key, value);
}
return null;
}
@Override
public void close() {
}
});
}
С помощью приведенного выше кода вы можете отфильтровать все несоответствующие разделы в методе transform
.По-прежнему существует проблема отправки данных об исходящих в определенный раздел.Если вы используете приведенный выше код как есть, то связыватель отправит данные в произвольные разделы (хотя это может быть хорошей возможностью для связующего).Однако в этом случае вы можете напрямую использовать потоки Kafka, если хотите, чтобы исходящие записи попадали в детерминированные разделы.Смотри ниже.
@StreamListener("requesti")
public void process(KStream<UUID, Account> events) {
final KStream<UUID, Account> transformed = events.transform(() -> new Transformer<UUID, Account, KeyValue<UUID, Account>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<UUID, Account> transform(UUID key, Account value) {
if (this.context.partition() == 1) {
//your processing logic
return KeyValue.pair(key, value);
}
return null;
}
@Override
public void close() {
}
});
transformed.to("outputTopic", Produced.with(new JsonSerde<>(), new JsonSerde<>(), new CustomStreamPartitioner()));
}
class CustomStreamPartitioner implements StreamPartitioner<UUID, Account> {
@Override
public Integer partition(String topic, UUID key, Account value, int numPartitions) {
return 1; //change to the right partition based on the key.
}
}