Ошибка подключения при создании записи в MSK topi c с использованием реестра схемы - PullRequest
0 голосов
/ 15 января 2020

Так что я смог зарегистрировать схему, а также производить и использовать из командной строки. Вот как я это сделал.

bin/kafka-avro-console-producer --broker-list b-****.xt08nj.c5.kafka.us-east-1.amazonaws.com:9092,b-6.****.xt08nj.c5.kafka.us-east-1.amazonaws.com:9092,b-***.xt08nj.c5.kafka.us-east-1.amazonaws.com:9092 --topic AVRO-AUDIT_EVENT --property schema.registry.url=http://localhost:8081 --property value.schema='{ "type":"record", "namespace":"com.example", "name":"AuditEvents", "doc":"Avro schema for our Audit Event table", "fields":[ {"name":"ID","type":"string","doc":"id for the event"}, {"name":"VERSION","type":"string","doc":"Version of audit event "}, {"name":"ACTION_TYPE","type":"string","doc":"what action has perfomed "}, {"name":"EVENT_TYPE","type":"string","doc":"what was the event "}, {"name":"CLIENT_ID","type":"string","doc":"id of the client "}, {"name":"DETAILS","type":"string","doc":"details about the event"}, {"name":"OBJECT_TYPE","type":"string","doc":"the type of audit event object"}, {"name":"UTC_DATE_TIME","type":"string","doc":"audit event date time "}, {"name":"POINT_IN_TIME_PRECISION","type":"string","doc":"POINT_IN_TIME_PRECISION"}, {"name":"TIME_ZONE","type":"string","doc":"TIME_ZONE"}, {"name":"TIMELINE_PRECISION","type":"string","doc":"TIMELINE_PRECISION"}, {"name":"GROUP_ID","type":"string","doc":"GROUP_ID"}, {"name":"OBJECT_DISPLAY_NAME","type":"string","doc":"OBJECT_DISPLAY_NAME"}, {"name":"OBJECT_ID","type":"string","doc":"OBJECT_ID"}, {"name":"USER_DISPLAY_NAME","type":"string","doc":"USER_DISPLAY_NAME"}, {"name":"USER_ID","type":"string","doc":"USER_ID"}, {"name":"PARENT_EVENT_ID","type":"string","doc":"PARENT_EVENT_ID"}, {"name":"NOTES","type":"string","doc":"NOTES"}, {"name":"SUMMARY","type":"string","doc":"SUMMARY"}, {"name":"AUDIT_EVENT_TO_UTC_DT","type":"string","doc":"AUDIT_EVENT_TO_UTC_DT"}, {"name":"AUDIT_EVENT_TO_DATE_PITP","type":"string","doc":"AUDIT_EVENT_TO_DATE_PITP"}, {"name":"AUDIT_EVENT_TO_DATE_TZ","type":"string","doc":"AUDIT_EVENT_TO_DATE_TZ"}, {"name":"AUDIT_EVENT_TO_DATE_TP","type":"string","doc":"AUDIT_EVENT_TO_DATE_TP"} ] }'

Я также мог использовать с консоли avro.

Теперь я хочу добавить это от java производителя с некоторым жестко заданным значением. Когда я делаю это, я получаю сообщение об отказе в соединении.

Вот мой java код.

props.setProperty("acks", "all");
        props.setProperty("retries", "10");
        // avro part
        props.setProperty("key.serializer", StringSerializer.class.getName());
        props.setProperty("value.serializer", KafkaAvroSerializer.class.getName());

        props.put("schema.registry.url", "http://localhost:8081/");

        Producer<String, AuditEvents> producer = new KafkaProducer<String, AuditEvents>(props);

        // copied from avro examples
        AuditEvents auditEvent = AuditEvents.newBuilder().setID("avro-lambda-1-5d17-136a-9749-0e710000fd04")
                .setVERSION("1").setACTIONTYPE("NEW_CASE").setEVENTTYPE("WORLDCHECK")
                .setCLIENTID("fgh-5d1e-17a2-9749-0e4d0000146d")
                .setDETAILS(
                        "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?><caseCreatedPayload><batchDetails/><caseId>0a348753-5d1e-17af-9749-0e7100006ccf</caseId><clientCaseId>case159</clientCaseId><caseName>William Mendoza</caseName><entityType>INDIVIDUAL</entityType><updateDate><dateTime>2017-07-08T22:02:32.211Z</dateTime><timeZone>UTC</timeZone><timelinePrecision>ON</timelinePrecision><pointInTimePrecision>TIME</pointInTimePrecision></updateDate><createDate><dateTime>2017-07-08T22:02:32.211Z</dateTime><timeZone>UTC</timeZone><timelinePrecision>ON</timelinePrecision><pointInTimePrecision>TIME</pointInTimePrecision></createDate><groupName>TEST_PERFORMANCE_CLIENT_2da27301-d44f-475f-92a2-1838b640d88b</groupName><nameTransposition><selected>false</selected><type>NAME_TRANSPOSITION</type><available>false</available></nameTransposition><gender>MALE</gender></caseCreatedPayload>")
                .setOBJECTTYPE("CASE").setUTCDATETIME("1578469623000").setPOINTINTIMEPRECISION("TIME")
                .setTIMEZONE("UTC").setTIMELINEPRECISION("ON").setGROUPID("0a348753-5d1e-17a2-9749-0e4d0000146d")
                .setOBJECTDISPLAYNAME("NULL").setOBJECTID("0a348753-5d1e-17af-9749-0e7100006ccf")
                .setUSERDISPLAYNAME("USER_FIRST_6cb4c322-cd3d-4809-97d3-07d2d96f10ed")
                .setUSERID("USER_LAST_7e99cad9-dc1c-4770-ac4f-33c4897ce404")
                .setPARENTEVENTID("parenteventid")
                .setEVENTTYPE("0a348752-5d17-138e-9749-0e6a00000c7f").setNOTES("NOTES").setSUMMARY("SUMMARY IN XML")
                .setAUDITEVENTTOUTCDT("1578469623000").setAUDITEVENTTODATEPITP("NULL").setAUDITEVENTTODATETZ("null")
                .setAUDITEVENTTODATETP("null").build();

        ProducerRecord<String, AuditEvents> producerRecord = new ProducerRecord<String, AuditEvents>(topic, auditEvent);

        System.out.println("*******auditEvent****"+auditEvent);
        producer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println(metadata);
                } else {
                    exception.printStackTrace();
                }
            }
        });

        producer.flush();
        producer.close();

Это трассировка стека ошибок

Error serializing Avro message: org.apache.kafka.common.errors.SerializationException
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
    at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
    at sun.net.www.http.HttpClient.New(HttpClient.java:339)
    at sun.net.www.http.HttpClient.New(HttpClient.java:357)
    at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
    at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1334)
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1309)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:199)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:894)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
    at com.amazonaws.lambda.demo.Producer_apilambdamskavro.handleRequest(Producer_apilambdamskavro.java:68)
    at com.amazonaws.lambda.demo.Producer_apilambdamskavro.handleRequest(Producer_apilambdamskavro.java:20)

Пожалуйста, предложите чего мне не хватает То, как я создал схему, было неверным?

1 Ответ

0 голосов
/ 15 января 2020

Вы не указали, где работает ваш код, но трассировка стека показывает, что ваш адрес реестра неверен или реестр не работает по адресу, который вы указали

props.put("schema.registry.url", "http://localhost:8081/");

Если он не работает на тот же физический компьютер, что и у производителя, вам нужно изменить адрес так, чтобы он указывал на правильное местоположение

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...