Поток данных Spring Cloud не течет - PullRequest
1 голос
/ 02 августа 2020

У меня есть DSL (показанный ниже), который заканчивается в "журнале", поэтому json, созданный из источника jdb c, должен регистрироваться, а его нет.

Поставщик читает очередь базы данных и создать массив json для строк.

Если я включу ведение журнала, будет выведено SybaseSupplierConfiguration.this.logger.debug("Json: {}", json);.

Почему он не попадает в «журнал»?

Пока что я пробовал:

  • Понизить весеннюю загрузку до 2.2.9 (используя 2.3.2)
  • Исправлен результат возврата jsonSupplier (в строку json)
  • Отключено prometheus / grafana
  • Явно настроенный опрос spring.cloud.stream.poller.fixed-delay=10
  • Используемый связующий rabbitmq и docker изображение
  • Предложил немного выпивки потоку данных Spring Cloud бог.

Ни один не работал.

docker:

export DATAFLOW_VERSION=2.6.0
export SKIPPER_VERSION=2.5.0
docker-compose -f ./docker-compose.yml -f ./docker-compose-prometheus.yml up -d

пом. xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>company-cloud-dataflow-apps</artifactId>
        <groupId>br.com.company.cloud.dataflow.apps</groupId>
        <version>1.0.0-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>jdbc-sybase-supplier</artifactId>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud.stream.app</groupId>
            <artifactId>app-starters-micrometer-common</artifactId>
            <version>${app-starters-micrometer-common.version}</version>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-prometheus</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer.prometheus</groupId>
            <artifactId>prometheus-rsocket-spring</artifactId>
            <version>${prometheus-rsocket.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>net.sourceforge.jtds</groupId>
            <artifactId>jtds</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jdk8</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-parameter-names</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-jaxb-annotations</artifactId>
            <version>${jackson.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

конфигурация:

....    
spring.cloud.stream.function.bindings.jsonSupplier-out-0=output
spring.cloud.function.definition=jsonSupplier

Реализация:

настройка оболочки:

app register --name jdbc-postgresql-sink --type sink --uri maven://br.com.company.cloud.dataflow.apps:jdbc-postgresql-sink:1.0.0-SNAPSHOT --force
app register --name jdbc-sybase-supplier --type source --uri maven://br.com.company.cloud.dataflow.apps:jdbc-sybase-supplier:1.0.0-SNAPSHOT --force

stream create --name sybase_to_pgsql --definition "jdbc-sybase-supplier | log "
stream deploy --name sybase_to_pgsql

журнал:

....
2020-08-02 00:40:18.644  INFO 81 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 0 endpoint(s) beneath base path '/actuator'
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {router} as a subscriber to the 'jsonSupplier_integrationflow.channel#0' channel
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.jsonSupplier_integrationflow.channel#0' has 1 subscriber(s).
2020-08-02 00:40:18.794  INFO 81 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'jsonSupplier_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-02 00:40:18.795  INFO 81 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: kafka
2020-08-02 00:40:19.235  INFO 81 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: kafka
2020-08-02 00:40:19.235  INFO 81 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: kafka
2020-08-02 00:40:19.362  INFO 81 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: sybase_to_pgsql.jdbc-sybase-supplier
2020-08-02 00:40:19.364  INFO 81 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    bootstrap.servers = [PLAINTEXT://kafka-broker:9092]
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

2020-08-02 00:40:19.572  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-02 00:40:19.572  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-02 00:40:19.572  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1596328819571
2020-08-02 00:40:20.403  INFO 81 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [PLAINTEXT://kafka-broker:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2020-08-02 00:40:20.477  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-02 00:40:20.477  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-02 00:40:20.477  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1596328820477
2020-08-02 00:40:20.573  INFO 81 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: um9lJtXTQUmURh9cwOkqxA
2020-08-02 00:40:20.574  INFO 81 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
2020-08-02 00:40:20.622  INFO 81 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.output' has 1 subscriber(s).
2020-08-02 00:40:20.625  INFO 81 --- [           main] o.s.i.e.SourcePollingChannelAdapter      : started bean 'jsonSupplier_integrationflow.org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0'
2020-08-02 00:40:20.654  INFO 81 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 20031 (http) with context path ''
2020-08-02 00:40:20.674  INFO 81 --- [           main] b.c.c.d.a.s.SybaseSupplierConfiguration  : Started SybaseSupplierConfiguration in 12.982 seconds (JVM running for 14.55)
2020-08-02 00:40:21.160  INFO 81 --- [ask-scheduler-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [PLAINTEXT://kafka-broker:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-2
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2020-08-02 00:40:21.189  INFO 81 --- [ask-scheduler-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-02 00:40:21.189  INFO 81 --- [ask-scheduler-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-02 00:40:21.189  INFO 81 --- [ask-scheduler-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1596328821189
2020-08-02 00:40:21.271  INFO 81 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Cluster ID: um9lJtXTQUmURh9cwOkqxA

1 Ответ

1 голос
/ 03 августа 2020

Если вы используете приложения на основе функций в SCDF, вам придется предоставить дополнительную конфигурацию при развертывании потоков. Обратите внимание на рецепт , который проходит через сценарий развертывания на основе функций.

В частности, обратите внимание на привязки функций c, специфичные для приложения, и переопределение свойств для времени - источник и приложения-приемник журнала.

app.time-source.spring.cloud.stream.function.bindings.timeSupplier-out-0=output
app.log-sink.spring.cloud.stream.function.bindings.logConsumer-in-0=input

Привязки каналов ввода / вывода требуют явного сопоставления с привязкой функции, которая есть в вашем настраиваемом источнике. Вам придется переопределить привязку функции custom-sources к каналу output, и тогда все должно объединиться.

В v2.6 мы пытаемся автоматизировать эту явную привязку в SCDF, поэтому в будущем будет на одну вещь меньше настраивать.

...