Невозможно обновить метаданные через ____ мс |Java-приложение не может получить метаданные от kafka - PullRequest
0 голосов
/ 11 октября 2018

Я пытался продюсера, которого я создал на Java, отправить сообщение в тему о брокере Kafka.Я могу отправлять сообщения в тему через консоль, т.е. используя kafka-console-producer.sh.Но когда я пытаюсь сделать то же самое с продюсером, которого я создал в Java, я получаю исключение тайм-аута с сообщением, что «невозможно получить метаданные после 100000 мс. Я прилагаю код производителя и server.properties для kafka здесь

getProducer ():

private synchronized Producer<String, String> getProducer() {
    if (!producer.isPresent()) {
        Properties prodProps = new Properties();
        prodProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,127.0.0.1:9092");
        prodProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 100000);
        prodProps.put(ProducerConfig.ACKS_CONFIG, "all");
        prodProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prodProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        setProducer(new KafkaProducer<>(prodProps));
    }
    return producer.get();
}

publishPayload ():

private Future<RecordMetadata> publishPayload(DataObject dataObject) {
    String topic = //topic name;
    String key = // unique ID;
    String payload = // String payload;
    return getProducer().send(new ProducerRecord<>(topic, key, payload));
}

server.properties

# The id of the broker. This must be set to a unique integer for each 
broker.
broker.id=0

############################# Socket Server Settings 
#############################

listeners=PLAINTEXT://127.0.0.1:9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all 
interfaces
host.name=127.0.0.1

# Hostname the broker will advertise to producers and consumers. If not set, 
it uses the
# value for "host.name" if configured.  Otherwise, it will use the value 
returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=127.0.0.1

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
advertised.port=9092

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept 
(protection against OOM)
socket.request.max.bytes=204857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow 
greater
# parallelism for consumption, but this will also result in more files 
across
# the brokers.
num.partitions=3

# The number of threads per data directory to be used for log recovery at 
startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs 
located in RAID array.
num.recovery.threads.per.data.dir=1

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

Manufacturer.Properties:

# format: host1:port1,host2:port2 ...
metadata.broker.list=localhost:9092

# name of the partitioner class for partitioning events; default partition 
spreads data randomly
#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or 
synchronously (sync)
producer.type=sync

# specify the compression codec for all data generated: none, gzip, snappy, 
lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, 
lz4, 
respectively
compression.codec=none

# message encoder
serializer.class=kafka.serializer.DefaultEncoder

Пожалуйста, дайте мне знать, если я что-то здесь упускаю и как заставить производителя Java иметь возможность общаться с темой kafka.

...