Kafka Producer в Eclipse не отправляет сообщения в тему - PullRequest
1 голос
/ 04 мая 2020

Я не могу отправить сообщения от KafkaProducer с помощью java из eclipse на Windows (хост-ОС) на kafka topi c, работающую в Hortonworks Sandbox. Мой код java ниже

import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Producer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "sandbox-hdp.hortonworks.com:6667");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Future<RecordMetadata> ck = null;
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        try {
            for (int i = 0; i < 1; i++) {
                System.out.println(i);
                ck = kafkaProducer.send(
                        new ProducerRecord<String, String>("kafkatopic", Integer.toString(i), "test message - " + i));
                kafkaProducer.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println(ck.toString());
            // System.out.println(ck.get().toString()); ->gives null
            kafkaProducer.close();
        }
    }
}

При запуске этого кода java ошибок нет. Он просто печатает индекс сообщения, в данном случае просто 0, а затем завершается, и я не могу чтобы увидеть 0 в консоли-потребителе на интерфейсе cmd песочницы hortonworks.

Это pom. xml зависимость

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>

Я добавил localhost к указать на песочницу следующим образом. Это мой файл hosts на windows (ОС хоста)

# Copyright (c) 1993-2009 Microsoft Corp.
#
# This is a sample HOSTS file used by Microsoft TCP/IP for Windows.
#
# This file contains the mappings of IP addresses to host names. Each
# entry should be kept on an individual line. The IP address should
# be placed in the first column followed by the corresponding host name.
# The IP address and the host name should be separated by at least one
# space.
#
# Additionally, comments (such as these) may be inserted on individual
# lines or following the machine name denoted by a '#' symbol.
#
# For example:
#
#      102.54.94.97     rhino.acme.com          # source server
#       38.25.63.10     x.acme.com              # x client host

# localhost name resolution is handled within DNS itself.
#   127.0.0.1       localhost
#   ::1             localhost
127.0.0.1 sandbox-hdp.hortonworks.com

Я создал топи c с именем kafkatopi c, используя следующую команду

/usr/hdp/3.0.1.0-187/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181  --replication-factor 1 --partitions 1 --topic kafkatopic                                                 

Я могу отправлять сообщения от производителя

[root@sandbox-hdp ~]# /usr/hdp/3.0.1.0-187/kafka/bin/kafka-console-producer.sh --broker-list sandbox-hdp.hortonworks.com:6667 --topic kafkatopic                                                                   
>statement1
>statement2
>statement3
>statement4
>statement5

А также могу одновременно видеть сообщения от потребителя в другой вкладке

[root@sandbox-hdp ~]# /usr/hdp/3.0.1.0-187/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning                                                                      
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].                     
this is statement1                                                                                                                                                                                                 
this is statement2                                                                                                                                                                                                 
this is statement3                                                                                                                                                                                                 
this is statement4                                                                                                                                                                                                 
this is statement5

Я вижу, что сообщения отправлены к topi c от производителя к потребителю по интерфейсу cmd, но я не могу отправлять сообщения извне с Java на Windows (хост-ОС) на kafka topi c в изолированной программной среде hortonworks.

Это consumer.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092

# consumer group id
group.id=test-consumer-group

# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
#auto.offset.reset=

Это provider.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.producer.ProducerConfig for more details

############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092

# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=

# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=

# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=

# the maximum size of a request in bytes
#max.request.size=

# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=

# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=

Это server.properties

# Generated by Apache Ambari. Sun May  3 19:25:08 2020

auto.create.topics.enable=true
auto.leader.rebalance.enable=true
compression.type=producer
controlled.shutdown.enable=true
controlled.shutdown.max.retries=3
controlled.shutdown.retry.backoff.ms=5000
controller.message.queue.size=10
controller.socket.timeout.ms=30000
default.replication.factor=1
delete.topic.enable=true
external.kafka.metrics.exclude.prefix=kafka.network.RequestMetrics,kafka.server.DelayedOperationPurgatory,kafka.server.BrokerTopicMetrics.BytesRejectedPerSec
external.kafka.metrics.include.prefix=kafka.network.RequestMetrics.ResponseQueueTimeMs.request.OffsetCommit.98percentile,kafka.network.RequestMetrics.ResponseQueueTimeMs.request.Offsets.95percentile,kafka.network.RequestMetrics.ResponseSendTimeMs.request.Fetch.95percentile,kafka.network.RequestMetrics.RequestsPerSec.request
fetch.purgatory.purge.interval.requests=10000
kafka.ganglia.metrics.group=kafka
kafka.ganglia.metrics.host=localhost
kafka.ganglia.metrics.port=8671
kafka.ganglia.metrics.reporter.enabled=true
kafka.metrics.reporters=
kafka.timeline.metrics.host_in_memory_aggregation=
kafka.timeline.metrics.host_in_memory_aggregation_port=
kafka.timeline.metrics.host_in_memory_aggregation_protocol=
kafka.timeline.metrics.hosts=
kafka.timeline.metrics.maxRowCacheSize=10000
kafka.timeline.metrics.port=
kafka.timeline.metrics.protocol=
kafka.timeline.metrics.reporter.enabled=true
kafka.timeline.metrics.reporter.sendInterval=5900
kafka.timeline.metrics.truststore.password=
kafka.timeline.metrics.truststore.path=
kafka.timeline.metrics.truststore.type=
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
listeners=PLAINTEXT://sandbox-hdp.hortonworks.com:6667
log.cleanup.interval.mins=10
log.dirs=/kafka-logs
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.bytes=-1
log.retention.check.interval.ms=600000
log.retention.hours=168
log.roll.hours=168
log.segment.bytes=1073741824
message.max.bytes=1000000
min.insync.replicas=1
num.io.threads=8
num.network.threads=3
num.partitions=1
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
offset.metadata.max.bytes=4096
offsets.commit.required.acks=-1
offsets.commit.timeout.ms=5000
offsets.load.buffer.size=5242880
offsets.retention.check.interval.ms=600000
offsets.retention.minutes=86400000
offsets.topic.compression.codec=0
offsets.topic.num.partitions=50
offsets.topic.replication.factor=1
offsets.topic.segment.bytes=104857600
port=6667
producer.metrics.enable=false
producer.purgatory.purge.interval.requests=10000
queued.max.requests=500
replica.fetch.max.bytes=1048576
replica.fetch.min.bytes=1
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.lag.max.messages=4000
replica.lag.time.max.ms=10000
replica.socket.receive.buffer.bytes=65536
replica.socket.timeout.ms=30000
sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
security.inter.broker.protocol=PLAINTEXT
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
ssl.client.auth=none
ssl.key.password=
ssl.keystore.location=
ssl.keystore.password=
ssl.truststore.location=
ssl.truststore.password=
zookeeper.connect=sandbox-hdp.hortonworks.com:2181
zookeeper.connection.timeout.ms=25000
zookeeper.session.timeout.ms=30000
zookeeper.sync.time.ms=2000

Это zookeeper.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

Я также присоединяю порты kafka, конфигурации и свойства ниже.

enter image description here

1 Ответ

2 голосов
/ 04 мая 2020

kafkaProducer.send фактически добавляет сообщение в буферную память и немедленно возвращается, а затем производитель отправляет сообщения пакетами для эффективности

Производитель состоит из пула буферов пространство, содержащее записи, которые еще не были переданы на сервер, а также поток фонового ввода-вывода, который отвечает за превращение этих записей в запросы и передачу их в кластер. Невозможность закрыть производителя после использования приведет к утечке этих ресурсов.

Метод send () является асинхронным. При вызове он добавляет запись в буфер ожидающих отправки записей и сразу же возвращает. Это позволяет производителю объединять отдельные записи для повышения эффективности.

kafkaProducer.flu sh вы можете использовать flush записей, немедленно доступных в буферной памяти

Вызов этого метода делает сразу доступными для отправки все буферизованные записи (даже если linger.ms больше 0) и блокирует выполнение запросов, связанных с этими записями. Постусловие гриппа sh () состоит в том, что любая ранее отправленная запись будет завершена (например, Future.isDone () == true). Запрос считается выполненным, если он успешно подтвержден в соответствии с заданной вами конфигурацией acks, иначе он приведет к ошибке.

...