Создайте Kafka Producer для отправки каждого сообщения из списка - PullRequest
1 голос
/ 24 октября 2019

У меня в докере работает kafka и zookeeper

Мне нужно отправлять сообщения kafka на kafka с помощью springboot.

Список сообщений:

[[{"id":"0x804f","timestamp":1551684977690}],

[{"id":"1234","timestamp":155168497800}],

[{"id":"39339e82-6bd6-4ab6-9672-21d0df4d34eb","timestamp":1551684977690}],

[{"id":"a3173ca5-4cc4-408b-a058-879a298d6081","timestamp":155168497800}]]

Это то, что я пробовал для примера:

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class Producer {
private Properties properties = new Properties();
String topicName = "tslistsbc";

public Producer(){
    String bootstrapServer = "docker-machineIP:9092";
    String keySerializer = StringSerializer.class.getName();
    String valueSerializer = StringSerializer.class.getName();
    String producerId = "simpleProducer";
    int retries = 2;

    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
    properties.put(ProducerConfig.CLIENT_ID_CONFIG, producerId);
    properties.put(ProducerConfig.RETRIES_CONFIG, retries);

    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    String value = "sample list"
    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "1",value);
    kafkaProducer.send(producerRecord);
    kafkaProducer.close();
}

Изображение Docker: Эти контейнеры работают на док-машине

 zookeeper:
    build: ../components/zookeeper
    image: xxxx:${ZOOKEEPER}
    container_name: zookeeper
    ports:
      - 2181:2181
    restart: unless-stopped

  kafka:
    build: ../components/kafka
    image: xxx:${EMD_KAFKA}
    container_name: image-kafka
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
      KAFKA_CREATE_TOPICS: "tslist:1:1,topic:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_MESSAGE_MAX_BYTES: "15728640"
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    restart: unless-stopped

Сообщение об ошибке

SLF4J: Failed toString() invocation on an object of type [org.apache.kafka.clients.NodeApiVersions]
Reported exception:
java.lang.NullPointerException
    at org.apache.kafka.clients.NodeApiVersions.apiVersionToText(NodeApiVersions.java:167)

Itsне работает, сообщение не отправляется.

1 Ответ

1 голос
/ 25 октября 2019

Поскольку вы пытаетесь получить доступ к одному из контейнеров создания докеров извне из запускаемых сервисов создания докеров (например, запустив ваш сервис в вашей IDE), вам нужно добавить имя контейнера докера в хосты вашей системы file.

В Linux / Mac файл hosts находится по адресу / etc / hosts , а в Windows - по адресу c: \ windows \ system32 \ drivers \ etc \ hosts . В соответствии с полученной ошибкой ваш файл hosts должен содержать следующую запись:

127.0.0.1 image-kafka

Что касается исключения

SLF4J: Failed toString() invocation on an object of type 
[org.apache.kafka.clients.NodeApiVersions]
Reported exception:
java.lang.NullPointerException
    at org.apache.kafka.clients.NodeApiVersions.apiVersionToText(NodeApiVersions.java:167)

, это связано с несоответствием между используемой версией Kafka Server и версией Kafka Client ( проверьте ответ здесь ).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...