весенний облачный поток кафка - выбирает объем и пропускает сердцебиение - PullRequest
0 голосов
/ 28 февраля 2019

Я смотрю на сервис весенней загрузки, который читает сообщения от apache kafka, запрашивает записи, указанные в сообщении, из другого сервиса через http, обрабатывает их, сохраняет некоторые данные в базу данных и публикует результаты в другой теме.

это делается через

@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)

это делается в нескольких сервисах и в целом работает просто отлично.Единственный набор свойств

spring.cloud.stream.binder.consumer.concurrency=20

сама тема имеет 20 разделов, которые должны соответствовать.

При мониторинге чтений из kafka мы обнаружили очень низкую пропускную способность и странное поведение:

приложение считывает до 500 сообщений одновременно, затем 1-2 минуты ничего.в течение этого времени потребитель неоднократно регистрирует, что он «пропускает тактовые импульсы, потому что раздел был перебалансирован», «переназначает разделы», а иногда даже выдает исключение, говоря, что «не удалось зафиксировать, потому что интервал опроса истек»

Мы пришли к выводу, что это означает, что потребитель получает 500 сообщений, занимает много времени, чтобы обработать все из них, пропускает свое временное окно и, следовательно, не может передать ни одно из 500 сообщений посреднику - который переназначает раздел и повторно отправляетснова и снова одни и те же сообщения.

После просмотра потоков и документов я обнаружил свойство "max.poll.records", но в качестве места для установки этого свойства я нашел противоречивые предложения.

некоторые говорят, чтоустановите его в

spring.cloud.stream.bindings.consumer.<input>.configuration

некоторые говорят

spring.cloud.stream.kafka.binders.consumer-properties

Я попытался установить оба значения в 1, но поведение служб не изменилось.

Как правильно обрабатыватьв случае, если потребитель не может идти в ногу с требуемым интервалом опроса с настройками по умолчанию?

common-yaml:

spring.cloud.stream.default.group=${spring.application.name}

service-yaml

spring:
  clould:
    stream:
      default:
        consumer.headerMode: embeddedHeaders
        producer.headerMode: embeddedHeaders
      bindings:
       someOutput:
         destination: outTopic
       someInput:
         destination: inTopic
           consumer:
             concurrency: 30
      kafka:
        bindings:
          consumer:
            someInput:
              configuarion:
                max.poll.records: 20 # ConsumerConfig ignores this
              consumer:
                enableDlq: true
                configuarion:
                  max.poll.records: 30 # ConsumerConfig ignores this
          someInput:
            configuarion:
              max.poll.records: 20 # ConsumerConfig ignores this
            consumer:
              enableDlq: true
              configuarion:
                max.poll.records: 30 # ConsumerConfig ignores this
        binder:
          consumer-properties:
            max.poll.records: 10 # this gets used first
          configuration:
            max.poll.records: 40 # this get used when the first one is not present

Всегда игнорируется это значение, если не установлено другое свойство, ConsumerConfiguration сохраняет значение по умолчанию 500 для записей максимального опроса

РЕДАКТИРОВАТЬ .: мы стали ближе:

Проблема была связана с пружинным повтором, имеющим набор exponentialBackoffStrategy - и кучей ошибок, которые эффективно останавливали приложение.

Чтоя не понимаю, мы вынудили 200 ошибок, отправив искаженные сообщения в рассматриваемую тему, что приводит к чтению приложения 200, возрасту (со старой конфигурацией повторных попыток) и последующей фиксации всех 200 ошибок одновременно.

Как это имеет смысл, если у нас есть

max.poll.records: 1
concurrency: 1
ackEachRecod = true
enableDlq: true # (which implicitly makes autoCommitOffsets = true according to the docs)

1 Ответ

0 голосов
/ 28 февраля 2019

Это

spring.cloud.stream.kafka.bindings.consumer.<input>.consumer.configuration.max.poll.records
.

См. документацию ...

Kafka Consumer Properties

Следующие свойства доступны только для потребителей Kafka и должны иметь префикс spring.cloud.stream.kafka.bindings.<channelName>.consumer.

...

конфигурация

Карта с ключом / значениемпара, содержащая общие потребительские свойства Kafka.

По умолчанию: пустая карта.

...

Вы также можете увеличить max.poll.interval.ms.

РЕДАКТИРОВАТЬ

Я только что проверил с 2.1.0.RELEASE - и работает, как я описал:

Нет настроек

2019-03-01 08:47:59.560  INFO 44698 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 500
    ...

Загрузка по умолчанию

spring.kafka.consumer.properties.max.poll.records=42

2019-03-01 08:49:49.197  INFO 45044 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 42
    ...

Связывание по умолчанию # 1

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.consumer-properties.max.poll.records=43

2019-03-01 08:52:11.469  INFO 45842 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 43
    ...

Связывание по умолчанию # 2

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43

2019-03-01 08:54:06.211  INFO 46252 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 43
    ...

Связывание по умолчанию

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44

2019-03-01 09:02:26.004  INFO 47833 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 44
    ...

Связывание специфическое

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45

2019-03-01 09:05:01.452  INFO 48330 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 45
    ...

EDIT2

Вот полное тестовое приложение.Я просто создал новое приложение в http://start.spring.io и выбрал «Kafka» и «Cloud Stream».

@SpringBootApplication
@EnableBinding(Sink.class)
public class So54932453Application {

    public static void main(String[] args) {
        SpringApplication.run(So54932453Application.class, args).close();
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {

    }

}

и

spring.cloud.stream.bindings.input.group=so54932453

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45

и

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>net.gprussell</groupId>
    <artifactId>so54932453</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>so54932453</name>
    <description>Demo</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>
    </repositories>

</project>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...