KafkaProducer не может выбрать schema.registry.url
, определенный в его свойствах.
Как мы можем видеть на следующем скриншоте, URL-адрес реестра схемы является фиктивным URL-адресом
// variable which is being debugged
private KafkaProducer<K, V> kafkaFullAckProducer;
Но все равно в моих журналах публикация сообщений с использованием KafkaProducer завершается неудачно с хостом как http://0:8081
{"@timestamp":"2018-10-31T18:57:37.906+05:30","message":"Failed to send HTTP request to endpoint: http://0:8081/subjects/
Эти два вышеупомянутых доказательства были взяты за один прогон программы. Как мы можем ясно увидеть, что URL-адрес schmearegistry во время отладки затмения - 123.1.1.1, но это http://0 в случае моих неудачных журналов.
Из-за этого в другой среде я не могу запустить другой назначенный schema.registry.url, потому что он всегда использует http://0
Код размещен на одном компьютере, а реестр / посредник схемы - на другом.
Реестр запущен в среде разработки ./confluent start
Код моего продюсера:
private KafkaProducer<K, V> kafkaFullAckProducer;
MonitoringConfig config;
public void produceWithFullAck(String brokerTopic, V genericRecord) throws Exception {
// key is null
ProducerRecord<K, V> record = new ProducerRecord<K, V>(brokerTopic, genericRecord);
try {
Future<RecordMetadata> futureHandle = this.kafkaFullAckProducer.send(record, (metadata, exception) -> {
if (metadata != null) {
log.info("Monitoring - Sent record(key=" + record.key() + " value=" + record.value()
+ " meta(partition=" + metadata.partition() + " offset=" + metadata.offset() + ")");
}
});
RecordMetadata recordMetadata = futureHandle.get();
} catch (Exception e) {
if (e.getCause() != null)
log.error("Monitoring - " + e.getCause().toString());
throw new RuntimeException(e.getMessage(), e.getCause());
} finally {
// initializer.getKafkaProducer().close();
}
}
@PostConstruct
private void initialize() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBrokerList());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// kafkaProps.put("value.serializer",
// "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomAvroSerializer.class.getName());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, Constants.COMPRESSION_TYPE_CONFIG);
props.put(ProducerConfig.RETRIES_CONFIG, config.getProducerRetryCount());
props.put("schema.registry.url", config.getSchemaRegistryUrl()); // the url is coming right here
props.put("acks", "all");
kafkaFullAckProducer = new KafkaProducer<K, V>(props);
}
MonitoringConfig:
открытый класс MonitoringConfig {
@Value("${kafka.zookeeper.connect}")
private String zookeeperConnect;
@Value("${kafka.broker.list}")
private String brokerList;
@Value("${consumer.timeout.ms:1000}")
private String consumerTimeout;
@Value("${producer.retry.count:1}")
private String producerRetryCount;
@Value("${schema.registry.url}")
private String schemaRegistryUrl;
@Value("${consumer.enable:false}")
private boolean isConsumerEnabled;
@Value("${consumer.count.thread}")
private int totalConsumerThread;
}
application.properties:
kafka.zookeeper.connect=http://localhost:2181
kafka.broker.list=http://localhost:9092
consumer.timeout.ms=1000
producer.retry.count=1
schema.registry.url=http://123.1.1.1:8082
Пользовательский avro-сериализатор - это то, что я должен осудить и использовать, как описано здесь , но я уверен, что это не является причиной этой проблемы.
Вот данные хостов:
ВЕДУЩИЙ 1: Имеет этот Java-сервис и Kafka Connect, и сюда приходят журналы ошибок.
ВЕДУЩИЙ 2: Имеет Кафку, Реестр Схем и Zookeper.