Я создал одну тему кафки с одним разделом.
kafka-topics --create --topic files.write --if-not-exists --zookeeper zookeeper:32181 --partitions 1 --replication-factor 1
Можно было бы отправить много сообщений в этой теме.
Но я бы хотел, чтобы один потребитель (для данной группы) обрабатывал эти сообщения по одному.
spring:
application:
name: file-consumer
cloud:
stream:
kafka:
binder:
type: kafka
brokers: localhost
defaultBrokerPort: 29092
defaultZkPort: 32181
configuration:
max.request.size: 300000
max.message.bytes: 300000
bindings:
fileWriteBindingInput:
consumer:
autoCommitOffset: false
bindings:
fileWriteBindingInput:
binder: kafka
destination: files.write
group: ${spring.application.name}
contentType: 'text/plain'
И пример кода Java
@StreamListener(FileBindingProcessor.INPUT_FILE_WRITE)
public void onInputMessage(Message<String> message, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
// I Would like here to synchronize the processing of messages one by one
// But, If many messages are pushed to this topic (single partition), they will be processed asynchronously event if I didn't yet acknowledge the current message
acknowledgment.acknowledge();
}
Чтоотсутствует в моей конфигурации?
Я думал, что пока сообщение не подтверждено (смещение не увеличено), ни одно другое сообщение не поступает из того же раздела.