раздел apache kafka, java - PullRequest
       4

раздел apache kafka, java

0 голосов
/ 03 июля 2018

Я очень новичок в apache kafka, и я пытаюсь сделать разделы kafka при отправке моей строки в kafka,

делает точно так же, как здесь

 public class Producer {
    public static void main(String[] argv)throws Exception {

           String topicName = "test";

           //Configure the Producer
           Properties configProperties = new Properties();
           configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.4.226:9092");
           configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
           configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
           configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CountryPartitioner.class.getCanonicalName());
           configProperties.put("partitions.0","USA");
           configProperties.put("partitions.1","India"); 

           org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
               ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName,"message111");
               producer.send(rec, new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception exception) {
                       System.out.println("Message sent to topic ->" + metadata.topic() +" stored at offset->" + metadata.offset());
                   }
               });
           producer.close();
       }


}

Теперь я получаю исключение, подобное этому

2018-07-03 17:52:12 ERROR RecordBatch:102 - Error executing user-provided callback on message for topic-partition test-2:
java.lang.NullPointerException
    at com.spnotes.kafka.partition.Producer$1.onCompletion(Producer.java:29)
    at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:99)
    at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:136)
    at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:220)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:192)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:141)
    at java.lang.Thread.run(Thread.java:748)

что я делаю не так? Заранее спасибо.

1 Ответ

0 голосов
/ 04 июля 2018

РЕДАКТИРОВАТЬ 3: Единственное, что вам нужно создать несколько разделов, см. Эти шаги. Сначала мы создаем файл конфигурации для каждого из брокеров (в Windows вместо этого используйте команду копирования)

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

Теперь отредактируйте эти новые файлы и установите следующие свойства:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

У нас уже есть Zookeeper и наш единственный узел запущен, поэтому нам просто нужно запустить два новых узла:

> bin/kafka-server-start.sh config/server-1.properties &
> bin/kafka-server-start.sh config/server-2.properties &

Теперь создайте новую тему с коэффициентом репликации три вместе с 3 разделами это решило бы проблему:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-replicated-topic
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...