Я пытаюсь реализовать пользовательский разделитель Kafka, используя привязки весеннего потока облака. Я хотел бы просто разделить пользовательскую тему на разделы и ничего не делать с темой компании (в этом случае Kafka будет использовать DefaultPartitioner).
Моя конфигурация привязок:
spring:
cloud:
stream:
bindings:
comp-out:
destination: company
contentType: application/json
user-out:
destination: user
contentType: application/json
Согласно справочному документу: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.1.0.RC4/single/spring-cloud-stream-binder-kafka.html#_partitioning_with_the_kafka_binder
Я изменил конфигурацию так:
spring:
cloud:
stream:
bindings:
comp-out:
destination: company
contentType: application/json
user-out:
destination: user
contentType: application/json
producer:
partitioned: true
partitionSelectorClass: config.UserPartitioner
Я отправляю сообщение в поток, используя это:
public void postUserStream(User user) throws ServiceException {
try {
LOG.info("Posting User {} into Kafka stream...", user);
MessageChannel messageChannel = messageStreams.outboundUser();
messageChannel
.send(MessageBuilder.withPayload(user)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
} catch (Exception ex) {
LOG.error("Error while populating User stream into Kafka.. ", ex);
throw ex;
}
}
Мой класс UserPartitioner:
public class UserPartitioner extends DefaultPartitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
Cluster cluster) {
String partitionKey = null;
if (Objects.nonNull(value)) {
User user = (User) value;
partitionKey = String.valueOf(user.getCompanyId()) + "_" + String.valueOf(user.getId());
keyBytes = partitionKey.getBytes();
}
return super.partition(topic, partitionKey, keyBytes, value, valueBytes, cluster);
}
}
Я получаю следующее исключение:
Описание:
Не удалось связать свойства в разделе «spring.cloud.stream.bindings.user-out.producer» с org.springframework.cloud.stream.binder.ProducerProperties:
Property: spring.cloud.stream.bindings.user-out.producer.partitioned
Value: true
Origin: "spring.cloud.stream.bindings.user-out.producer.partitioned" from property source "bootstrapProperties"
Reason: No setter found for property: partitioned
Действие:
Обновите конфигурацию вашего приложения
Будет полезна любая справочная ссылка о том, как настроить пользовательский раздел с помощью связывателей сообщений.
Редактировать: на основе документации Пробовал также следующие шаги:
пользователя из:
назначение: пользователь
contentType: application / json
режиссер:
partitionKeyExtractorClass: config.SimpleUserPartitioner
@Component
public class SimpleUserPartitioner implements PartitionKeyExtractorStrategy {
@Override
public Object extractKey(Message<?> message) {
if(message.getPayload() instanceof BaseUser) {
BaseUser user = (BaseUser) message.getPayload();
return user.getId();
}
return 10;
}
}
обновление 2. У меня получилось решение добавить счетчик разделов для привязок и автоматическое добавление в true в связывателе:
spring:
logging:
level: info
cloud:
stream:
bindings:
user-out:
destination: user
contentType: application/json
producer:
partition-key-expression: headers['partitionKey']
partition-count: 4
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
autoAddPartitions: true