Spring cloud stream kafka binder, подключающийся к docker -составу kafka - PullRequest
0 голосов
/ 15 апреля 2020

Я хочу использовать весенний облачный поток kafka binder. Я запустил 2 брокера kafka и 1 zookeeper со следующим docker -композитным файлом.

version: '3'

services:
  kafka-0:
    image: confluentinc/cp-kafka:5.2.1
    container_name: kafka-0
    restart: always
    ports:
      - "9094:9092"
    expose:
      - "9094"
    environment:
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-0:9094
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2182
      - KAFKA_ADVERTISED_HOST_NAME=kafka-0
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    depends_on:
      - zookeeper

  kafka-1:
    image: confluentinc/cp-kafka:5.2.1
    container_name: kafka-1
    restart: always
    ports:
      - "9095:9093"
    expose:
      - "9095"
    environment:
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-1:9095
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2182
      - KAFKA_ADVERTISED_HOST_NAME=kafka-1
      - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    depends_on:
      - zookeeper

 zookeeper:
    image: confluentinc/cp-zookeeper:5.3.1
    container_name: zookeeper
    ports:
      - "2182:2181"
    expose:
      - "2182"
    environment:
      - ZOOKEEPER_CLIENT_PORT=2182

В моем весеннем загрузочном приложении мой файл свойств имеет вид

spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9094,127.0.0.1:9095

Мое весеннее облако Потоковый исходный класс выглядит следующим образом:

@EnableScheduling
@EnableBinding(Source.class)
public class UsageDetailSender {

    @Autowired
    private Source source;

    private String[] users = {"Glenn", "Sabby", "Mark", "Janne", "Ilaya"};

    @Scheduled(fixedDelay = 1000)
    public void sendEvents() {
        UsageDetail usageDetail = new UsageDetail();
        usageDetail.setUserId(this.users[new Random().nextInt(5)]);
        usageDetail.setDuration(new Random().nextInt(300));
        usageDetail.setData(new Random().nextInt(700));
        this.source.output().send(MessageBuilder.withPayload(usageDetail).build());
    }
}

Из значений AdminClientConfig кажется, что я могу подключиться к двум брокерам kafka.

bootstrap.servers = [127.0.0.1:9094, 127.0.0.1:9095]
    client.id = 
    connections.max.idle.ms = 300000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 120000
    retries = 5
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

Однако я получаю эту ошибку. Как я могу это исправить?

org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

2020-04-15 20:32:20.303  INFO 10384 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=adminclient-1] Metadata update failed

org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.

2020-04-15 20:32:20.314 ERROR 10384 --- [           main] o.s.cloud.stream.binding.BindingService  : Failed to create producer binding; retrying in 30 seconds

org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.TimeoutException
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:290) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:137) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:78) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:193) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:97) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:151) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:268) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:243) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindOutputs(BindableProxyFactory.java:287) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:163) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at io.spring.dataflow.sample.usagedetailsender.UsageDetailSenderApplication.main(UsageDetailSenderApplication.java:10) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
    at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131) ~[idea_rt.jar:na]
Caused by: java.util.concurrent.TimeoutException: null
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274) ~[kafka-clients-2.0.1.jar:na]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:323) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:299) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:281) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    ... 32 common frames omitted

1 Ответ

0 голосов
/ 15 апреля 2020

Вам нужен рекламируемый слушатель на localhost:....

См. здесь .

Это брокер cp-all-in-one на 5.4. 1 пост

  broker:
    image: ${REPOSITORY}/cp-server:${TAG}
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
...