Kafka Producer-Consumer не может производить / потреблять avro данные - PullRequest
0 голосов
/ 31 октября 2018

Я написал код производителя kafka для получения данных avro, но он показывает следующую ошибку при сериализации данных:

Исключение в теме "главная" org.apache.kafka.common.errors.SerializationException: ошибка сериализация авро сообщения Вызывается: java.net.UnknownHostException: sandbox-hdf.hortonworks.com atjava.net.AbstractPlainSocketImpl.connect (AbstractPlainSocketImpl.java:184) в java.net.PlainSocketImpl.connect (PlainSocketImpl.java:172) в java.net.SocksSocketImpl.connect (SocksSocketImpl.java:392) на java.net.Socket.connect (Socket.java:589) на java.net.Socket.connect (Socket.java:538) на sun.net.NetworkClient.doConnect (NetworkClient.java:180) на sun.net.www.http.HttpClient.openServer (HttpClient.java:463) на sun.net.www.http.HttpClient.openServer (HttpClient.java:558) на sun.net.www.http.HttpClient. (HttpClient.java:242) на sun.net.www.http.HttpClient.New (HttpClient.java:339) на sun.net.www.http.HttpClient.New (HttpClient.java:357) на sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient (HttpURLConnection.java:1220) на sun.net.www.protocol.http.HttpURLConnection.plainConnect0 (HttpURLConnection.java:1156) на sun.net.www.protocol.http.HttpURLConnection.plainConnect (HttpURLConnection.java:1050) на sun.net.www.protocol.http.HttpURLConnection.connect (HttpURLConnection.java:984) ......

Ниже приведен мой код производителя:

package com.perfaware.kafka01;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.example.Customer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class producerAvro {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties properties = new Properties();
        // setting producer properties

        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("acks", "1");
        properties.setProperty("retries", "10");

        // Serialization(avro part)
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
        properties.setProperty("schema.registry.url", "http://sandbox-hdf.hortonworks.com:7788/api/v1");

        Producer<String, Customer> producer = new KafkaProducer<String, Customer>(properties);

        String topic = "topic1";

        Customer customer = Customer.newBuilder()
                .setAge(21)
                .setAutomatedEmail(false)
                .setFirstName("Manish")
                .setLastName("B.")
                .setHeight(176f)
                .setWeight(62f)
                .build();

        ProducerRecord<String, Customer> producerRecord = new ProducerRecord<String, Customer>("topic1", customer);

        System.out.println(customer);
        producer.send(producerRecord, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println(metadata.toString());
                } else {
                    exception.printStackTrace();
                }
            }
        }).get();

        producer.flush();
        producer.close();
    }
}

Я также присоединяю свой файл pom.xml, если это помогает:

 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>Kafka_Avro</groupId>
  <artifactId>Kafka_Avro_Practise</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <properties>
    <avro.verion>1.7.4</avro.verion>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <confluent.version>3.1.1</confluent.version>
  </properties>

 <repositories>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
 </repositories>

  <dependencies>


  <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-tools -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-tools</artifactId>
    <version>2.0.0</version>
</dependency>



  <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>3.1.1</version>
        </dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.8.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-compiler -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-compiler</artifactId>
    <version>1.8.2</version>
</dependency>

<dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.3.1</version>
            <scope>provided</scope>
        </dependency>

<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-compiler-plugin -->
<dependency>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.8.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-mapred -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-mapred</artifactId>
    <version>1.8.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-ipc -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-ipc</artifactId>
    <version>1.8.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>1.11</version>
</dependency>

  </dependencies>

   <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Я также попытался изменить сериализатор значений:

com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer

но это не решило проблему.

1 Ответ

0 голосов
/ 31 октября 2018

UnknownHostException: sandbox-hdf.hortonworks.com

Если вы используете песочницу, вы должны отредактировать файл /etc/hosts, чтобы сделать этот хост известным

Вы определенно захотите использовать сериализатор Hortonworks, хотя, если используете их реестр. Непонятно, какую ошибку вы получили при его использовании, но если то же самое, то это проблема с сетью, не имеющая ничего общего с Avro.

"value.serializer","com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer"

Кроме того, bootstrap.servers, вероятно, потребуется также разрешить экземпляры песочницы Kafka, а не только localhost

Если вы хотите использовать Confluent, хотя я не уверен, что он будет работать, вам нужно будет использовать согласованные номера версий Kafka: например, Вы поставили Kafka 1.1.1, 2.0 и Confluent 3.1.1, основанный на Kafka 0.10.x.
Аналогично с Avro - все должно быть установлено, например, только 1.8.1, хотя вам не нужны библиотеки IPC или Mapred Avro для работы вашего кода. Вероятно, не компилятор тоже.

...