На KafkaProducer.send (сообщение) я получаю "исключение Ошибка сериализации сообщения Avro" - PullRequest
0 голосов
/ 11 декабря 2018

Я использую Avro для создания класса.Вот мой код в продюсере выглядит так:

TweetInfo tweetInfo = TweetInfo.newBuilder()
                    .setTweetId(status.getId())
                    .setTweetCreatedAt(status.getCreatedAt().toString())
                    .setTweetMessage(status.getText())
                    .setUserId(user.getId())
                    .setUserCreatedAt(user.getCreatedAt().toString())
                    .setUserName(user.getName())
                    .setUserScreenName(user.getScreenName())
                    .build();

            ProducerRecord<String, TweetInfo> data = new ProducerRecord(KafkaConstants.TOPIC, tweetInfo);
            producer.send(data);

TweetInfo - класс, сгенерированный схемой Avro.Когда я запускаю программу, я вижу трассировку стека следующим образом

    2018-12-11 01:51:58.138  WARN 16244 --- [c Dispatcher[0]] o.i.service.kafka.TweetKafkaProducer     : exception Error serializing Avro message
2018-12-11 01:51:59.162 ERROR 16244 --- [c Dispatcher[0]] i.c.k.s.client.rest.RestService          : Failed to send HTTP request to endpoint: http://localhost:8081/subjects/twitterData-value/versions

java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ~[na:1.8.0_152]
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) ~[na:1.8.0_152]
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_152]
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_152]
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_152]
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) ~[na:1.8.0_152]
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_152]
    at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_152]
    at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.New(HttpClient.java:339) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.New(HttpClient.java:357) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1334) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1309) ~[na:1.8.0_152]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:178) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:326) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:318) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:313) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:114) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:153) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79) [kafka-avro-serializer-5.0.1.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) [kafka-avro-serializer-5.0.1.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60) [kafka-clients-2.1.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:879) [kafka-clients-2.1.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841) [kafka-clients-2.1.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:728) [kafka-clients-2.1.0.jar:na]
    at org.interview.service.kafka.TweetKafkaProducer$1.onStatus(TweetKafkaProducer.java:95) [classes/:na]
    at twitter4j.StatusStreamImpl.onStatus(StatusStreamImpl.java:75) [twitter4j-stream-4.0.6.jar:4.0.6]
    at twitter4j.StatusStreamBase$1.run(StatusStreamBase.java:105) [twitter4j-stream-4.0.6.jar:4.0.6]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]

У меня работает zookeeper и kafka.Нужно ли мне также запускать Schema Registry?Если да, то есть ли руководство для этого?Я не могу найти ни одного.

Ответы [ 2 ]

0 голосов
/ 13 декабря 2018

Как сказал @ cricket_007, если вы находитесь в Windows, попробуйте использовать docker.

Ниже ссылки на docker compose, которая запустит kafka, zookeeper, реестр схемы и остальные kafka, с помощью которых вы можете протестировать вашего производителя.без труда.https://github.com/confluentinc/docker-images/blob/master/examples/fullstack/docker-compose.yml

РЕДАКТИРОВАТЬ : Извините, мой плохой, это ссылка старого репо, проверьте нижеприведенный, у вас есть все сливные платформы (вы можете удалить то, что вам не нужно)!

https://github.com/confluentinc/cp-docker-images/blob/5.0.1-post/examples/cp-all-in-one/docker-compose.yml

0 голосов
/ 11 декабря 2018

Не удалось отправить HTTP-запрос конечной точке

Должен быть запущен сервер реестра Confluent Schema.И вы, возможно, захотите попробовать поразить конечные точки HTTP самостоятельно (см. Документы ниже).

Не уверен, как вы его запустили, но вы можете скачать Confluent OSS , распаковать его куда-нибудь, затем втерминал, вам нужно перейти к папке bin извлеченной папки и запустить confluent start schema-registry. Примечание : это работает только для Linux.

Или, если вам нужна конфигурация «производственного развертывания», сначала нужно будет отредактировать файлы свойств в папке etc и запустить каждый из Zookeeper, Kafka и Registry с помощью соответствующих сценариев.

Документы: Запуск реестра схемы


Относительно комментариев

Когда я пытаюсь выполнить команды в статье, выдает ошибку, чтоbin не действительна команда

$ bin/... сначала предполагает, что у вас есть cd 'd в папке confluent-x.x.x, которая была извлечена


Кстати, есть существующих проектов Kafka Connect , которые взаимодействуют с API Twitter.

...