Я написал ниже код для чтения записей схемы avro из Kafka topi c. Я взял .avs c и сгенерировал класс (paymentengine), используя maven и чтение записи с помощью SpecificAvroRecord. Я могу успешно прочитать эти записи. Теперь мне нужно выполнить некоторую проверку этих записей и вставить эти записи в таблицу.
package com.example.consumer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class PayKafkaSpecifcAvro {
public static void main(String[] args) {
// setting properties
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
//name topic
String topic = "pengine";
// create the consumer
KafkaConsumer<String, pengine> consumer = new KafkaConsumer<String, pengine>(props);
//subscribe to topic
consumer.subscribe(Collections.singleton(topic));
System.out.println("Waiting for the data...");
while (true) {
ConsumerRecords<String, pengine> records = consumer.poll(Duration.ofMillis(5000));
for (ConsumerRecord<String, pengine> record : records) {
System.out.println(record.value());
System.out.println((record.value().getVcp()));
consumer.commitSync();
}
}
}
}
Вывод:
Поскольку вывод имеет формат JSON, как я могу преобразовать его в строку и сравнить? Мне нужно сравнить значение ACH и VCP, и если значения совпадают, необходимо отметить эту строку как ошибку. Кроме того, преобразование этих записей в строку помогло бы мне также вставить эти записи в базу данных.
класс pengine:
package com.example.consumer; /**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.SchemaStore;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.util.Utf8;
@org.apache.avro.specific.AvroGenerated
public class pengine extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = -3169039590588895557L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"pengine\",\"fields\":[{\"name\":\"tin\",\"type\":\"string\"},{\"name\":\"ach\",\"type\":\"string\"},{\"name\":\"vcp\",\"type\":\"string\"}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
private static SpecificData MODEL$ = new SpecificData();
private static final BinaryMessageEncoder<pengine> ENCODER =
new BinaryMessageEncoder<pengine>(MODEL$, SCHEMA$);
private static final BinaryMessageDecoder<pengine> DECODER =
new BinaryMessageDecoder<pengine>(MODEL$, SCHEMA$);
/**
* Return the BinaryMessageEncoder instance used by this class.
* @return the message encoder used by this class
*/
public static BinaryMessageEncoder<pengine> getEncoder() {
return ENCODER;
}
/**
* Return the BinaryMessageDecoder instance used by this class.
* @return the message decoder used by this class
*/
public static BinaryMessageDecoder<pengine> getDecoder() {
return DECODER;
}
/**
* Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
* @param resolver a {@link SchemaStore} used to find schemas by fingerprint
* @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore
*/
public static BinaryMessageDecoder<pengine> createDecoder(SchemaStore resolver) {
return new BinaryMessageDecoder<pengine>(MODEL$, SCHEMA$, resolver);
}
/**
* Serializes this pengine to a ByteBuffer.
* @return a buffer holding the serialized data for this instance
* @throws java.io.IOException if this instance could not be serialized
*/
public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
return ENCODER.encode(this);
}
/**
* Deserializes a pengine from a ByteBuffer.
* @param b a byte buffer holding serialized data for an instance of this class
* @return a pengine instance decoded from the given buffer
* @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class
*/
public static pengine fromByteBuffer(
java.nio.ByteBuffer b) throws java.io.IOException {
return DECODER.decode(b);
}
private CharSequence tin;
private CharSequence ach;
private CharSequence vcp;
/**
* Default constructor. Note that this does not initialize fields
* to their default values from the schema. If that is desired then
* one should use <code>newBuilder()</code>.
*/
public pengine() {}
/**
* All-args constructor.
* @param tin The new value for tin
* @param ach The new value for ach
* @param vcp The new value for vcp
*/
public pengine(CharSequence tin, CharSequence ach, CharSequence vcp) {
this.tin = tin;
this.ach = ach;
this.vcp = vcp;
}
public SpecificData getSpecificData() { return MODEL$; }
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter. Applications should not call.
public Object get(int field$) {
switch (field$) {
case 0: return tin;
case 1: return ach;
case 2: return vcp;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
// Used by DatumReader. Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, Object value$) {
switch (field$) {
case 0: tin = (CharSequence)value$; break;
case 1: ach = (CharSequence)value$; break;
case 2: vcp = (CharSequence)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
/**
* Gets the value of the 'tin' field.
* @return The value of the 'tin' field.
*/
public CharSequence getTin() {
return tin;
}
/**
* Sets the value of the 'tin' field.
* @param value the value to set.
*/
public void setTin(CharSequence value) {
this.tin = value;
}
/**
* Gets the value of the 'ach' field.
* @return The value of the 'ach' field.
*/
public CharSequence getAch() {
return ach;
}
/**
* Sets the value of the 'ach' field.
* @param value the value to set.
*/
public void setAch(CharSequence value) {
this.ach = value;
}
/**
* Gets the value of the 'vcp' field.
* @return The value of the 'vcp' field.
*/
public CharSequence getVcp() {
return vcp;
}
/**
* Sets the value of the 'vcp' field.
* @param value the value to set.
*/
public void setVcp(CharSequence value) {
this.vcp = value;
}
/**
* Creates a new pengine RecordBuilder.
* @return A new pengine RecordBuilder
*/
public static pengine.Builder newBuilder() {
return new pengine.Builder();
}
/**
* Creates a new pengine RecordBuilder by copying an existing Builder.
* @param other The existing builder to copy.
* @return A new pengine RecordBuilder
*/
public static pengine.Builder newBuilder(pengine.Builder other) {
if (other == null) {
return new pengine.Builder();
} else {
return new pengine.Builder(other);
}
}
/**
* Creates a new pengine RecordBuilder by copying an existing pengine instance.
* @param other The existing instance to copy.
* @return A new pengine RecordBuilder
*/
public static pengine.Builder newBuilder(pengine other) {
if (other == null) {
return new pengine.Builder();
} else {
return new pengine.Builder(other);
}
}
/**
* RecordBuilder for pengine instances.
*/
@org.apache.avro.specific.AvroGenerated
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<pengine>
implements org.apache.avro.data.RecordBuilder<pengine> {
private CharSequence tin;
private CharSequence ach;
private CharSequence vcp;
/** Creates a new Builder */
private Builder() {
super(SCHEMA$);
}
/**
* Creates a Builder by copying an existing Builder.
* @param other The existing Builder to copy.
*/
private Builder(pengine.Builder other) {
super(other);
if (isValidValue(fields()[0], other.tin)) {
this.tin = data().deepCopy(fields()[0].schema(), other.tin);
fieldSetFlags()[0] = other.fieldSetFlags()[0];
}
if (isValidValue(fields()[1], other.ach)) {
this.ach = data().deepCopy(fields()[1].schema(), other.ach);
fieldSetFlags()[1] = other.fieldSetFlags()[1];
}
if (isValidValue(fields()[2], other.vcp)) {
this.vcp = data().deepCopy(fields()[2].schema(), other.vcp);
fieldSetFlags()[2] = other.fieldSetFlags()[2];
}
}
/**
* Creates a Builder by copying an existing pengine instance
* @param other The existing instance to copy.
*/
private Builder(pengine other) {
super(SCHEMA$);
if (isValidValue(fields()[0], other.tin)) {
this.tin = data().deepCopy(fields()[0].schema(), other.tin);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.ach)) {
this.ach = data().deepCopy(fields()[1].schema(), other.ach);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.vcp)) {
this.vcp = data().deepCopy(fields()[2].schema(), other.vcp);
fieldSetFlags()[2] = true;
}
}
/**
* Gets the value of the 'tin' field.
* @return The value.
*/
public CharSequence getTin() {
return tin;
}
/**
* Sets the value of the 'tin' field.
* @param value The value of 'tin'.
* @return This builder.
*/
public pengine.Builder setTin(CharSequence value) {
validate(fields()[0], value);
this.tin = value;
fieldSetFlags()[0] = true;
return this;
}
/**
* Checks whether the 'tin' field has been set.
* @return True if the 'tin' field has been set, false otherwise.
*/
public boolean hasTin() {
return fieldSetFlags()[0];
}
/**
* Clears the value of the 'tin' field.
* @return This builder.
*/
public pengine.Builder clearTin() {
tin = null;
fieldSetFlags()[0] = false;
return this;
}
/**
* Gets the value of the 'ach' field.
* @return The value.
*/
public CharSequence getAch() {
return ach;
}
/**
* Sets the value of the 'ach' field.
* @param value The value of 'ach'.
* @return This builder.
*/
public pengine.Builder setAch(CharSequence value) {
validate(fields()[1], value);
this.ach = value;
fieldSetFlags()[1] = true;
return this;
}
/**
* Checks whether the 'ach' field has been set.
* @return True if the 'ach' field has been set, false otherwise.
*/
public boolean hasAch() {
return fieldSetFlags()[1];
}
/**
* Clears the value of the 'ach' field.
* @return This builder.
*/
public pengine.Builder clearAch() {
ach = null;
fieldSetFlags()[1] = false;
return this;
}
/**
* Gets the value of the 'vcp' field.
* @return The value.
*/
public CharSequence getVcp() {
return vcp;
}
/**
* Sets the value of the 'vcp' field.
* @param value The value of 'vcp'.
* @return This builder.
*/
public pengine.Builder setVcp(CharSequence value) {
validate(fields()[2], value);
this.vcp = value;
fieldSetFlags()[2] = true;
return this;
}
/**
* Checks whether the 'vcp' field has been set.
* @return True if the 'vcp' field has been set, false otherwise.
*/
public boolean hasVcp() {
return fieldSetFlags()[2];
}
/**
* Clears the value of the 'vcp' field.
* @return This builder.
*/
public pengine.Builder clearVcp() {
vcp = null;
fieldSetFlags()[2] = false;
return this;
}
@Override
@SuppressWarnings("unchecked")
public pengine build() {
try {
pengine record = new pengine();
record.tin = fieldSetFlags()[0] ? this.tin : (CharSequence) defaultValue(fields()[0]);
record.ach = fieldSetFlags()[1] ? this.ach : (CharSequence) defaultValue(fields()[1]);
record.vcp = fieldSetFlags()[2] ? this.vcp : (CharSequence) defaultValue(fields()[2]);
return record;
} catch (org.apache.avro.AvroMissingFieldException e) {
throw e;
} catch (Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
}
}
}
@SuppressWarnings("unchecked")
private static final org.apache.avro.io.DatumWriter<pengine>
WRITER$ = (org.apache.avro.io.DatumWriter<pengine>)MODEL$.createDatumWriter(SCHEMA$);
@Override public void writeExternal(java.io.ObjectOutput out)
throws java.io.IOException {
WRITER$.write(this, SpecificData.getEncoder(out));
}
@SuppressWarnings("unchecked")
private static final org.apache.avro.io.DatumReader<pengine>
READER$ = (org.apache.avro.io.DatumReader<pengine>)MODEL$.createDatumReader(SCHEMA$);
@Override public void readExternal(java.io.ObjectInput in)
throws java.io.IOException {
READER$.read(this, SpecificData.getDecoder(in));
}
@Override protected boolean hasCustomCoders() { return true; }
@Override public void customEncode(org.apache.avro.io.Encoder out)
throws java.io.IOException
{
out.writeString(this.tin);
out.writeString(this.ach);
out.writeString(this.vcp);
}
@Override public void customDecode(org.apache.avro.io.ResolvingDecoder in)
throws java.io.IOException
{
org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff();
if (fieldOrder == null) {
this.tin = in.readString(this.tin instanceof Utf8 ? (Utf8)this.tin : null);
this.ach = in.readString(this.ach instanceof Utf8 ? (Utf8)this.ach : null);
this.vcp = in.readString(this.vcp instanceof Utf8 ? (Utf8)this.vcp : null);
} else {
for (int i = 0; i < 3; i++) {
switch (fieldOrder[i].pos()) {
case 0:
this.tin = in.readString(this.tin instanceof Utf8 ? (Utf8)this.tin : null);
break;
case 1:
this.ach = in.readString(this.ach instanceof Utf8 ? (Utf8)this.ach : null);
break;
case 2:
this.vcp = in.readString(this.vcp instanceof Utf8 ? (Utf8)this.vcp : null);
break;
default:
throw new java.io.IOException("Corrupt ResolvingDecoder.");
}
}
}
}
}