Как реализовать пользовательский раздел Кафки с использованием весеннего облачного потока - PullRequest
1 голос
/ 20 июня 2019

Я пытаюсь реализовать пользовательский разделитель Kafka, используя привязки весеннего потока облака. Я хотел бы просто разделить пользовательскую тему на разделы и ничего не делать с темой компании (в этом случае Kafka будет использовать DefaultPartitioner).

Моя конфигурация привязок:

spring:
  cloud:
    stream:
      bindings:
        comp-out:
          destination: company
          contentType: application/json
        user-out:
          destination: user
          contentType: application/json

Согласно справочному документу: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.1.0.RC4/single/spring-cloud-stream-binder-kafka.html#_partitioning_with_the_kafka_binder Я изменил конфигурацию так:

spring:
  cloud:
    stream:
      bindings:
        comp-out:
          destination: company
          contentType: application/json
        user-out:
          destination: user
          contentType: application/json
		      producer:
            partitioned: true
            partitionSelectorClass: config.UserPartitioner

Я отправляю сообщение в поток, используя это:

public void postUserStream(User user) throws ServiceException {
        try {
            LOG.info("Posting User {} into Kafka stream...", user);
            MessageChannel messageChannel = messageStreams.outboundUser();
            messageChannel
                    .send(MessageBuilder.withPayload(user)
                            .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
        } catch (Exception ex) {
            LOG.error("Error while populating User stream into Kafka.. ", ex);
            throw ex;
        }
    }

Мой класс UserPartitioner:

public class UserPartitioner extends DefaultPartitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
          Cluster cluster) {

        String partitionKey = null;
        if (Objects.nonNull(value)) {
            User user = (User) value;
            partitionKey = String.valueOf(user.getCompanyId()) + "_" + String.valueOf(user.getId());
            keyBytes = partitionKey.getBytes();
        }
        return super.partition(topic, partitionKey, keyBytes, value, valueBytes, cluster);
    }
}

Я получаю следующее исключение:

Описание: Не удалось связать свойства в разделе «spring.cloud.stream.bindings.user-out.producer» с org.springframework.cloud.stream.binder.ProducerProperties:

Property: spring.cloud.stream.bindings.user-out.producer.partitioned
Value: true
Origin: "spring.cloud.stream.bindings.user-out.producer.partitioned" from property source "bootstrapProperties"
Reason: No setter found for property: partitioned

Действие: Обновите конфигурацию вашего приложения

Будет полезна любая справочная ссылка о том, как настроить пользовательский раздел с помощью связывателей сообщений.

Редактировать: на основе документации Пробовал также следующие шаги:

пользователя из: назначение: пользователь contentType: application / json режиссер: partitionKeyExtractorClass: config.SimpleUserPartitioner

@Component
public class SimpleUserPartitioner implements PartitionKeyExtractorStrategy {

	@Override
	public Object extractKey(Message<?> message) {
		if(message.getPayload() instanceof BaseUser) {
			BaseUser user = (BaseUser) message.getPayload();
			return user.getId();
		}
		return 10;
	}
    
}

обновление 2. У меня получилось решение добавить счетчик разделов для привязок и автоматическое добавление в true в связывателе:

spring:
  logging:
    level: info
  cloud:
    stream:
      bindings:
        user-out:
          destination: user
          contentType: application/json
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 4

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          autoAddPartitions: true

Ответы [ 2 ]

1 голос
/ 20 июня 2019

Нет недвижимости partitioned; геттер зависит от других свойств ...

public boolean isPartitioned() {
    return this.partitionKeyExpression != null
            || this.partitionKeyExtractorName != null;
}

partitionSelectorClass: config.UserPartitioner

UserPartitioner - это Кафка Partitioner - он определяет, какие потребители получают какие разделы (на стороне потребителя)

partitionSelectorClass должно быть PartitionSelectorStrategy - оно определяет, какой раздел записи отправляется на (на стороне производителя).

Это совершенно разные объекты.

Если вы действительно хотите настроить способ распределения разделов по экземплярам-потребителям, это касается Kafka и не имеет никакого отношения к Spring.

Кроме того, все привязки потребителей в одном и том же переплете будут использовать один и тот же Partitioner. Вы должны настроить несколько связывателей на разные Partitioner с.

Учитывая ваш вопрос, я думаю, вы просто путаете Partitioner с PartitionSelectorStrategy, и вам нужен последний.

0 голосов
/ 20 июня 2019

Также обратите внимание; partitionSelectorClass. в течение некоторого времени устарела и была удалена в текущем мастере (не будет доступно в 3.0.0) в пользу partitionSelectorName - https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.0.M1/spring-cloud-stream.html#spring-cloud-stream-overview-partitioning

...