Так что я смог зарегистрировать схему, а также производить и использовать из командной строки. Вот как я это сделал.
bin/kafka-avro-console-producer --broker-list b-****.xt08nj.c5.kafka.us-east-1.amazonaws.com:9092,b-6.****.xt08nj.c5.kafka.us-east-1.amazonaws.com:9092,b-***.xt08nj.c5.kafka.us-east-1.amazonaws.com:9092 --topic AVRO-AUDIT_EVENT --property schema.registry.url=http://localhost:8081 --property value.schema='{ "type":"record", "namespace":"com.example", "name":"AuditEvents", "doc":"Avro schema for our Audit Event table", "fields":[ {"name":"ID","type":"string","doc":"id for the event"}, {"name":"VERSION","type":"string","doc":"Version of audit event "}, {"name":"ACTION_TYPE","type":"string","doc":"what action has perfomed "}, {"name":"EVENT_TYPE","type":"string","doc":"what was the event "}, {"name":"CLIENT_ID","type":"string","doc":"id of the client "}, {"name":"DETAILS","type":"string","doc":"details about the event"}, {"name":"OBJECT_TYPE","type":"string","doc":"the type of audit event object"}, {"name":"UTC_DATE_TIME","type":"string","doc":"audit event date time "}, {"name":"POINT_IN_TIME_PRECISION","type":"string","doc":"POINT_IN_TIME_PRECISION"}, {"name":"TIME_ZONE","type":"string","doc":"TIME_ZONE"}, {"name":"TIMELINE_PRECISION","type":"string","doc":"TIMELINE_PRECISION"}, {"name":"GROUP_ID","type":"string","doc":"GROUP_ID"}, {"name":"OBJECT_DISPLAY_NAME","type":"string","doc":"OBJECT_DISPLAY_NAME"}, {"name":"OBJECT_ID","type":"string","doc":"OBJECT_ID"}, {"name":"USER_DISPLAY_NAME","type":"string","doc":"USER_DISPLAY_NAME"}, {"name":"USER_ID","type":"string","doc":"USER_ID"}, {"name":"PARENT_EVENT_ID","type":"string","doc":"PARENT_EVENT_ID"}, {"name":"NOTES","type":"string","doc":"NOTES"}, {"name":"SUMMARY","type":"string","doc":"SUMMARY"}, {"name":"AUDIT_EVENT_TO_UTC_DT","type":"string","doc":"AUDIT_EVENT_TO_UTC_DT"}, {"name":"AUDIT_EVENT_TO_DATE_PITP","type":"string","doc":"AUDIT_EVENT_TO_DATE_PITP"}, {"name":"AUDIT_EVENT_TO_DATE_TZ","type":"string","doc":"AUDIT_EVENT_TO_DATE_TZ"}, {"name":"AUDIT_EVENT_TO_DATE_TP","type":"string","doc":"AUDIT_EVENT_TO_DATE_TP"} ] }'
Я также мог использовать с консоли avro.
Теперь я хочу добавить это от java производителя с некоторым жестко заданным значением. Когда я делаю это, я получаю сообщение об отказе в соединении.
Вот мой java код.
props.setProperty("acks", "all");
props.setProperty("retries", "10");
// avro part
props.setProperty("key.serializer", StringSerializer.class.getName());
props.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081/");
Producer<String, AuditEvents> producer = new KafkaProducer<String, AuditEvents>(props);
// copied from avro examples
AuditEvents auditEvent = AuditEvents.newBuilder().setID("avro-lambda-1-5d17-136a-9749-0e710000fd04")
.setVERSION("1").setACTIONTYPE("NEW_CASE").setEVENTTYPE("WORLDCHECK")
.setCLIENTID("fgh-5d1e-17a2-9749-0e4d0000146d")
.setDETAILS(
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?><caseCreatedPayload><batchDetails/><caseId>0a348753-5d1e-17af-9749-0e7100006ccf</caseId><clientCaseId>case159</clientCaseId><caseName>William Mendoza</caseName><entityType>INDIVIDUAL</entityType><updateDate><dateTime>2017-07-08T22:02:32.211Z</dateTime><timeZone>UTC</timeZone><timelinePrecision>ON</timelinePrecision><pointInTimePrecision>TIME</pointInTimePrecision></updateDate><createDate><dateTime>2017-07-08T22:02:32.211Z</dateTime><timeZone>UTC</timeZone><timelinePrecision>ON</timelinePrecision><pointInTimePrecision>TIME</pointInTimePrecision></createDate><groupName>TEST_PERFORMANCE_CLIENT_2da27301-d44f-475f-92a2-1838b640d88b</groupName><nameTransposition><selected>false</selected><type>NAME_TRANSPOSITION</type><available>false</available></nameTransposition><gender>MALE</gender></caseCreatedPayload>")
.setOBJECTTYPE("CASE").setUTCDATETIME("1578469623000").setPOINTINTIMEPRECISION("TIME")
.setTIMEZONE("UTC").setTIMELINEPRECISION("ON").setGROUPID("0a348753-5d1e-17a2-9749-0e4d0000146d")
.setOBJECTDISPLAYNAME("NULL").setOBJECTID("0a348753-5d1e-17af-9749-0e7100006ccf")
.setUSERDISPLAYNAME("USER_FIRST_6cb4c322-cd3d-4809-97d3-07d2d96f10ed")
.setUSERID("USER_LAST_7e99cad9-dc1c-4770-ac4f-33c4897ce404")
.setPARENTEVENTID("parenteventid")
.setEVENTTYPE("0a348752-5d17-138e-9749-0e6a00000c7f").setNOTES("NOTES").setSUMMARY("SUMMARY IN XML")
.setAUDITEVENTTOUTCDT("1578469623000").setAUDITEVENTTODATEPITP("NULL").setAUDITEVENTTODATETZ("null")
.setAUDITEVENTTODATETP("null").build();
ProducerRecord<String, AuditEvents> producerRecord = new ProducerRecord<String, AuditEvents>(topic, auditEvent);
System.out.println("*******auditEvent****"+auditEvent);
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata);
} else {
exception.printStackTrace();
}
}
});
producer.flush();
producer.close();
Это трассировка стека ошибок
Error serializing Avro message: org.apache.kafka.common.errors.SerializationException
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1334)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1309)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:199)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:894)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
at com.amazonaws.lambda.demo.Producer_apilambdamskavro.handleRequest(Producer_apilambdamskavro.java:68)
at com.amazonaws.lambda.demo.Producer_apilambdamskavro.handleRequest(Producer_apilambdamskavro.java:20)
Пожалуйста, предложите чего мне не хватает То, как я создал схему, было неверным?