Kafka - потребительский процесс с параллелизмом - PullRequest
0 голосов
/ 10 декабря 2018

У меня есть продюсер, пишущий в тему Kafka с 100 разделами, и он выбирает раздел по идентификатору пользователя, поэтому сообщения пользователя обязательно обрабатываются в порядке их отправки в очередь.

служба, отвечающая за потребление, имеет 2-10 экземпляров, каждый из которых имеет свою конфигурацию:

spring.cloud.stream.bindings.input.consumer.concurrency=10
spring.cloud.stream.bindings.input.consumer.partitioned=true

Недавно я заметил, что, хотя потребитель запускает для обработки сообщений раздела по порядку,иногда одно сообщение выполняется до следующего за ним, потому что его легче обрабатывать, чем следующее.

Для меня важно поддерживать текущую скорость обработки сервиса, и потому что я не знаком с потокамиМодель весеннего облачного потока, которую я хотел проконсультироваться и попросить у других знаний.Каков наилучший способ гарантировать, что сообщение одного пользователя обрабатывается только после выполнения предыдущих?

- РЕДАКТИРОВАТЬ -

По запросу, более актуальноparams.

  • Версия Kafka: 0.10.2.1
  • версия Spring-Cloud-Stream: 1.1.0.RELEASE

параметры связующего:

spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOnError=true
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true

конфигурация потребителя, напечатанная на консоли:

2018-12-11 09:56:51,975 [RMI TCP Connection(6)-127.0.0.1] INFO  [AbstractConfig::logAll] - ConsumerConfig values: 
    metric.reporters = []
    metadata.max.age.ms = 300000
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    max.partition.fetch.bytes = 1048576
    bootstrap.servers = [localhost:9092]
    ssl.keystore.type = JKS
    enable.auto.commit = true
    sasl.mechanism = GSSAPI
    interceptor.classes = null
    exclude.internal.topics = true
    ssl.truststore.password = null
    client.id = consumer-11
    ssl.endpoint.identification.algorithm = null
    max.poll.records = 2147483647
    check.crcs = true
    request.timeout.ms = 40000
    heartbeat.interval.ms = 3000
    auto.commit.interval.ms = 5000
    receive.buffer.bytes = 65536
    ssl.truststore.type = JKS
    ssl.truststore.location = null
    ssl.keystore.password = null
    fetch.min.bytes = 1
    send.buffer.bytes = 131072
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    group.id = 
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.trustmanager.algorithm = PKIX
    ssl.key.password = null
    fetch.max.wait.ms = 500
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    session.timeout.ms = 30000
    metrics.num.samples = 2
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    ssl.protocol = TLS
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    auto.offset.reset = latest

конфигурация производителя, напечатанная на консоли:

2018-12-11 09:56:52,439 [-kafka-listener-1] INFO  [AbstractConfig::logAll] - ProducerConfig values: 
    metric.reporters = []
    metadata.max.age.ms = 300000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [localhost:9092]
    ssl.keystore.type = JKS
    sasl.mechanism = GSSAPI
    max.block.ms = 60000
    interceptor.classes = null
    ssl.truststore.password = null
    client.id = producer-5
    ssl.endpoint.identification.algorithm = null
    request.timeout.ms = 30000
    acks = 1
    receive.buffer.bytes = 32768
    ssl.truststore.type = JKS
    retries = 0
    ssl.truststore.location = null
    ssl.keystore.password = null
    send.buffer.bytes = 131072
    compression.type = none
    metadata.fetch.timeout.ms = 60000
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    ssl.protocol = TLS
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    batch.size = 16384
    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    max.request.size = 1048576
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    linger.ms = 0

1 Ответ

0 голосов
/ 10 декабря 2018

Разделы распределены по потокам контейнера.

Если параллелизм контейнера равен 10 и у вас 20 разделов, каждому потребителю (потоку) обычно назначается 2 раздела.

Это гарантируетДоставка заказа внутри раздела.

...