У меня в докере работает kafka и zookeeper
Мне нужно отправлять сообщения kafka на kafka с помощью springboot.
Список сообщений:
[[{"id":"0x804f","timestamp":1551684977690}],
[{"id":"1234","timestamp":155168497800}],
[{"id":"39339e82-6bd6-4ab6-9672-21d0df4d34eb","timestamp":1551684977690}],
[{"id":"a3173ca5-4cc4-408b-a058-879a298d6081","timestamp":155168497800}]]
Это то, что я пробовал для примера:
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class Producer {
private Properties properties = new Properties();
String topicName = "tslistsbc";
public Producer(){
String bootstrapServer = "docker-machineIP:9092";
String keySerializer = StringSerializer.class.getName();
String valueSerializer = StringSerializer.class.getName();
String producerId = "simpleProducer";
int retries = 2;
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, producerId);
properties.put(ProducerConfig.RETRIES_CONFIG, retries);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
String value = "sample list"
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "1",value);
kafkaProducer.send(producerRecord);
kafkaProducer.close();
}
Изображение Docker: Эти контейнеры работают на док-машине
zookeeper:
build: ../components/zookeeper
image: xxxx:${ZOOKEEPER}
container_name: zookeeper
ports:
- 2181:2181
restart: unless-stopped
kafka:
build: ../components/kafka
image: xxx:${EMD_KAFKA}
container_name: image-kafka
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
KAFKA_CREATE_TOPICS: "tslist:1:1,topic:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_MESSAGE_MAX_BYTES: "15728640"
ports:
- 9092:9092
depends_on:
- zookeeper
restart: unless-stopped
Сообщение об ошибке
SLF4J: Failed toString() invocation on an object of type [org.apache.kafka.clients.NodeApiVersions]
Reported exception:
java.lang.NullPointerException
at org.apache.kafka.clients.NodeApiVersions.apiVersionToText(NodeApiVersions.java:167)
Itsне работает, сообщение не отправляется.