Я разрабатываю C Sharp SDK, который принимает IEnumerable данные, а затем отправляет их в спокойный API. Затем оставшийся API отправит эти записи в Kafka.
У меня уже была схема schemaString (this.schemaString), и вот моя реализация для части сериализации SDK:
public string ValidateAvroSchema<T>(IEnumerable<T> value) {
using(var ms = new MemoryStream()){
try{
Avro.IO.Encoder e = new BinaryEncoder(ms);
var schema = Schema.Parse(this.schemaString) as RecordSchema;
var writer = new GenericDatumWriter<GenericRecord>(schema);
foreach(T item in value) {
GenericRecord record = new GenericRecord(schema);
FieldInfo[] fieldsInfo;
Type typeParameterType = typeof(T);
var type = item.GetType();
fieldsInfo = typeParameterType.GetFields();
for (int i = 0; i < fieldsInfo.Length; i++)
{
record.Add(fieldsInfo[i].Name, GetFieldValue(item, fieldsInfo[i].Name));
}
writer.Write(record, e);
}
// I am passing this string to Restful API so the Java side can parse it
return Convert.ToBase64String(ms.ToArray());;
} catch (AvroException e) {
// handle exception
}
}
}
На API sied я сделал что-то вроде:
byte[] input = Base64.decodeBase64(payloadInJson.toString());
List<GenericRecord> listOfRecords = new ArrayList<>();
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
InputStream inputStream = new ByteArrayInputStream(input);
BinaryDecoder decoder = new DecoderFactory().get().binaryDecoder(inputStream, null);
while(true){
try {
GenericRecord record = reader.read(null, decoder);
listOfRecords.add(record);
} catch (EOFException eof) {
break;
}
}
Это работает сейчас. Спасибо вам, ребята.
Остался только один вопрос.
Вопрос 1: Правильно ли использовать отражение, чтобы получить все свойства и затем добавить их в GenericRecord? Кажется, это очень дорого.
Большое спасибо.