Я использую Confluent 5.0 на сервере Linux, используя Winscp и Putty. У меня есть приложение Kafka (Java / Eclipse) в Windows.
Когда я запускаю приложение Java, оно не идентифицирует брокеров Kafka в Confluent, работающем в Linux.
Я протестировал свое приложение Java, которое отправляет данные в разделы Kafka в MACBook, запустив Confluent 5.0 в MAC Terminal. Сейчас я пытаюсь реализовать то же приложение Kafka в Windows. Поскольку Confluent не поддерживается в Windows, я работаю на сервере Linux.
Я использую Confluent вместо Apache Kafka, потому что я использую Schema-Registry в своем приложении.
Используя netstat -tupln & curl -v http: / localhost: номер порта. выяснил, работает Кафка на 8082 и реестр схемы на 8081 подробности о портах .
Ниже приведены мои свойства Kafka в Java-приложении.
public static Properties producerProperties() {
// normal producer
properties.setProperty("bootstrap.servers", "127.0.0.1:8082");
properties.setProperty("acks", "all");
properties.setProperty("retries", "10");
// avro part
properties.setProperty("key.serializer", StringSerializer .class.getName());
properties.setProperty("value.serializer", KafkaAvroSerializer .class.getName());
properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
return properties;
}
public static Properties consumerProperties() {
// Properties properties = new Properties();
// normal consumer
properties.setProperty("bootstrap.servers", "127.0.0.1:8082");
//different for consumer
properties.setProperty("group.id", "Avro-consumer");
properties.setProperty("enable.auto.commit", "false");
properties.setProperty("auto.offset.reset", "earliest");
// avro part
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
properties.setProperty("specific.avro.reader", "true");
return properties;
}
public static Properties streamsProperties() {
// normal consumer
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "com.github.ptn006");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:8082");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return properties;
}
Ожидаемое:
Данные записаны в темы Кафки.
Фактический:
WARN Соединение с узлом -1 не может быть установлено. Брокер может быть недоступен. (Org.apache.kafka.clients.NetworkClient: 589)