Разделы Spring Cloud Stream Разделы чтения / записи KStream - PullRequest
0 голосов
/ 26 февраля 2019

У меня есть несколько микросервисов и интерфейс API, мне нравится использовать одну и ту же тему для событий, каждое событие домена в отдельном разделе, я смог настроить связыватель Spring Kafka для отправки в другой раздел, используя реализацию

spring.cloud.stream.bindings.<channel>.producer.partition-key- extractor-name= 

PartitionKeyExtractorStrategy

Мой вопрос здесь заключается в том, могу ли я настроить Kinder Binder так, чтобы он мог использовать пользовательский раздел только для @input и @ Output.

Насколько я понимаю, пока

spring.cloud.stream.kafka.streams.bindings.<channel>.producer.configuration.partitioner.class=

но это никогда не настраивается.если есть какой-либо другой способ или я делаю ошибку, пожалуйста, предложите

1 Ответ

0 голосов
/ 26 февраля 2019

Вы детерминировано отправляете записи в определенный раздел?Другими словами, вы знаете фактический раздел для каждого ключа?Если вы укажете только PartitionKeyExtractorStrategy, то механизм связывания будет произвольно выбирать раздел для отправки этой записи.Если вы хотите сделать его детерминированным, вы можете предоставить partitionSelectorClass как свойство (реализовать интерфейс PartitionSelectorStrategy) на стороне вашего производителя.Этот интерфейс позволяет выбрать раздел на основе ключа.Допустим, вы хотите отправить все записи с ключом UUID-1 в раздел 1, и вы кодировали это с помощью реализации PartitionSelectorStrategy.Это означает, что ваш процессор потоков kafka знает, что записи с ключом UUID-1 поступают из раздела 1.С этими допущениями вы можете сделать следующее в вашем процессоре потоков kafka.По сути, это вариант этого ответа для одного из ваших других вопросов.

@StreamListener("requesti")
@SendTo("responseo")
public KStream<UUID,Account> process(KStream<UUID, Account> events) {


        return  events.transform(() -> new Transformer<UUID, Account, KeyValue<UUID, Account>>() {
            ProcessorContext context;

            @Override
            public void init(ProcessorContext context) {
                this.context = context;
            }

            @Override
            public KeyValue<UUID,Account> transform(UUID key, Account value) {
                if (this.context.partition() == 1) {
                    //your processing logic
                    return KeyValue.pair(key, value);
                }
                return null;
            }

            @Override
            public void close() {

            }
        });
    }

С помощью приведенного выше кода вы можете отфильтровать все несоответствующие разделы в методе transform.По-прежнему существует проблема отправки данных об исходящих в определенный раздел.Если вы используете приведенный выше код как есть, то связыватель отправит данные в произвольные разделы (хотя это может быть хорошей возможностью для связующего).Однако в этом случае вы можете напрямую использовать потоки Kafka, если хотите, чтобы исходящие записи попадали в детерминированные разделы.Смотри ниже.

@StreamListener("requesti")
public void process(KStream<UUID, Account> events) {


    final KStream<UUID, Account> transformed = events.transform(() -> new Transformer<UUID, Account, KeyValue<UUID, Account>>() {
            ProcessorContext context;


            @Override
            public void init(ProcessorContext context) {
                this.context = context;
            }

            @Override
            public KeyValue<UUID, Account> transform(UUID key, Account value) {
                if (this.context.partition() == 1) {
                    //your processing logic
                    return KeyValue.pair(key, value);
                }
                return null;
            }

            @Override
            public void close() {

            }
        });


        transformed.to("outputTopic", Produced.with(new JsonSerde<>(), new JsonSerde<>(), new CustomStreamPartitioner()));
    }

    class CustomStreamPartitioner implements StreamPartitioner<UUID, Account> {

        @Override
        public Integer partition(String topic, UUID key, Account value, int numPartitions) {
            return 1; //change to the right partition based on the key.
        }
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...