У меня есть продюсер, пишущий в тему Kafka с 100 разделами, и он выбирает раздел по идентификатору пользователя, поэтому сообщения пользователя обязательно обрабатываются в порядке их отправки в очередь.
служба, отвечающая за потребление, имеет 2-10 экземпляров, каждый из которых имеет свою конфигурацию:
spring.cloud.stream.bindings.input.consumer.concurrency=10
spring.cloud.stream.bindings.input.consumer.partitioned=true
Недавно я заметил, что, хотя потребитель запускает для обработки сообщений раздела по порядку,иногда одно сообщение выполняется до следующего за ним, потому что его легче обрабатывать, чем следующее.
Для меня важно поддерживать текущую скорость обработки сервиса, и потому что я не знаком с потокамиМодель весеннего облачного потока, которую я хотел проконсультироваться и попросить у других знаний.Каков наилучший способ гарантировать, что сообщение одного пользователя обрабатывается только после выполнения предыдущих?
- РЕДАКТИРОВАТЬ -
По запросу, более актуальноparams.
- Версия Kafka: 0.10.2.1
- версия Spring-Cloud-Stream: 1.1.0.RELEASE
параметры связующего:
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOnError=true
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
конфигурация потребителя, напечатанная на консоли:
2018-12-11 09:56:51,975 [RMI TCP Connection(6)-127.0.0.1] INFO [AbstractConfig::logAll] - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [localhost:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer-11
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id =
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest
конфигурация производителя, напечатанная на консоли:
2018-12-11 09:56:52,439 [-kafka-listener-1] INFO [AbstractConfig::logAll] - ProducerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [localhost:9092]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 60000
interceptor.classes = null
ssl.truststore.password = null
client.id = producer-5
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 30000
acks = 1
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 0
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 0