KafkaProducer всегда выбирает localhost: 8081 для реестра схемы в Java API - PullRequest
0 голосов
/ 31 октября 2018

KafkaProducer не может выбрать schema.registry.url, определенный в его свойствах.

Как мы можем видеть на следующем скриншоте, URL-адрес реестра схемы является фиктивным URL-адресом

// variable which is being debugged
private KafkaProducer<K, V> kafkaFullAckProducer;

enter image description here

Но все равно в моих журналах публикация сообщений с использованием KafkaProducer завершается неудачно с хостом как http://0:8081

{"@timestamp":"2018-10-31T18:57:37.906+05:30","message":"Failed to send HTTP request to endpoint: http://0:8081/subjects/

Эти два вышеупомянутых доказательства были взяты за один прогон программы. Как мы можем ясно увидеть, что URL-адрес schmearegistry во время отладки затмения - 123.1.1.1, но это http://0 в случае моих неудачных журналов.

Из-за этого в другой среде я не могу запустить другой назначенный schema.registry.url, потому что он всегда использует http://0

Код размещен на одном компьютере, а реестр / посредник схемы - на другом.

Реестр запущен в среде разработки ./confluent start

Код моего продюсера:

private KafkaProducer<K, V> kafkaFullAckProducer;
MonitoringConfig config;

public void produceWithFullAck(String brokerTopic, V genericRecord) throws Exception {
    // key is null
    ProducerRecord<K, V> record = new ProducerRecord<K, V>(brokerTopic, genericRecord);
    try {
        Future<RecordMetadata> futureHandle = this.kafkaFullAckProducer.send(record, (metadata, exception) -> {

            if (metadata != null) {
                log.info("Monitoring - Sent record(key=" + record.key() + " value=" + record.value()
                        + " meta(partition=" + metadata.partition() + " offset=" + metadata.offset() + ")");
            }
        });
        RecordMetadata recordMetadata = futureHandle.get();

    } catch (Exception e) {
        if (e.getCause() != null)
            log.error("Monitoring - " + e.getCause().toString());
        throw new RuntimeException(e.getMessage(), e.getCause());
    } finally {
        // initializer.getKafkaProducer().close();
    }
}

@PostConstruct
private void initialize() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBrokerList());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    // kafkaProps.put("value.serializer",
    // "org.apache.kafka.common.serialization.ByteArraySerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomAvroSerializer.class.getName());
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, Constants.COMPRESSION_TYPE_CONFIG);
    props.put(ProducerConfig.RETRIES_CONFIG, config.getProducerRetryCount());
    props.put("schema.registry.url", config.getSchemaRegistryUrl()); // the url is coming right here 
    props.put("acks", "all");
    kafkaFullAckProducer = new KafkaProducer<K, V>(props);
}

MonitoringConfig: открытый класс MonitoringConfig {

    @Value("${kafka.zookeeper.connect}")
    private String zookeeperConnect;

    @Value("${kafka.broker.list}")
    private String brokerList;

    @Value("${consumer.timeout.ms:1000}")
    private String consumerTimeout;

    @Value("${producer.retry.count:1}")
    private String producerRetryCount;

    @Value("${schema.registry.url}")
    private String schemaRegistryUrl;

    @Value("${consumer.enable:false}")
    private boolean isConsumerEnabled;

    @Value("${consumer.count.thread}")
    private int totalConsumerThread;
}

application.properties:

kafka.zookeeper.connect=http://localhost:2181
kafka.broker.list=http://localhost:9092
consumer.timeout.ms=1000
producer.retry.count=1
schema.registry.url=http://123.1.1.1:8082

Пользовательский avro-сериализатор - это то, что я должен осудить и использовать, как описано здесь , но я уверен, что это не является причиной этой проблемы.

Вот данные хостов: ВЕДУЩИЙ 1: Имеет этот Java-сервис и Kafka Connect, и сюда приходят журналы ошибок. ВЕДУЩИЙ 2: Имеет Кафку, Реестр Схем и Zookeper.

1 Ответ

0 голосов
/ 01 ноября 2018

Вы используете пользовательский сериализатор, и как часть реализации Serializer, вы должны определить метод configure, который принимает карту.

В этом методе, я полагаю, вы определили поле CachedSchemaRegistryClient, но не извлекли свойство url, добавленное на уровне источника, из карты конфигурации, и поэтому по умолчанию оно будет равно используя другой адрес localhost

Код Confluent требует прохождения четырех классов, но вы увидите реализацию Serializer здесь , затем посмотрите на отдельный класс Config, а также родительские классы Abstract Serializer и SerDe. Начиная с вашего предыдущего поста , я указывал, что я не думал, что вам действительно нужно использовать собственный сериализатор Avro, потому что вы, похоже, переделываете то, что делает класс AbstractKafkaSerializer

...