Я нашел, как это сделать.Есть два способа:
- использовать avro-tool.jar из C #;
- использовать библиотеку Apache Avro (рекомендуется).
1 ° РешениеСначала получите байты в сообщении данных о событии и сохраните его локально.
public List<string> SaveAvroBytesOnFile(EventData eventHubMessage, string functionAppDirectory)
{
try
{
string fileName = "avro-bytes.avro";
List<string> filesToProcess = new List<string>();
string singleFileNameToSave = fileName;
filesToProcess.Add(singleFileNameToSave);
string path = Path.Combine(functionAppDirectory,"AvroBytesFiles");
System.IO.Directory.CreateDirectory(path);
File.WriteAllBytes($"{path}{singleFileNameToSave}", eventHubMessage.GetBytes());
return filesToProcess;
}
catch (Exception ex)
{
throw;
}
}
Затем вызовите avro-tool.jar из функции azure и перенаправьте вывод в переменную
Process myProcess = new Process();
myProcess.StartInfo.UseShellExecute = false;
myProcess.StartInfo.FileName = @"D:\Program Files\Java\jdk1.8.0_73\bin\java.exe";
// execute avro tools
string avroResourcesPath = Path.Combine(functionAppDirectory, "AvroResources");
// here you must use the file with the bytes saved before and the avroschema file
myProcess.StartInfo.Arguments = $"-jar {Path.Combine(avroResourcesPath, "avro-tools-1.8.2.jar")} fragtojson --schema-file {Path.Combine(avroResourcesPath, "schemafile.avsc")} {Path.Combine(functionAppDirectory, "AvroBytesFiles", byteFileNames[i])}";
myProcess.StartInfo.RedirectStandardOutput = true;
myProcess.Start();
// print the output to a string
string output = myProcess.StandardOutput.ReadToEnd();
myProcess.WaitForExit();
Avro-tool может десериализовать байты с схемой, отличной от той, которая вам нужна, поэтому вам нужно отобразить модель avro-tool в вашей модели.Этот шаг может потреблять много ресурсов при изменении сложности модели.
AvroToolModel avroToolModel= JsonConvert.DeserializeObject<AvroTool>(output);
// map the avro model in my model
MyMode myModel = new MyModel(avroToolModel);
2 ° Решение
Это рекомендуемое решение.Десериализацию можно выполнить несколькими строками.
string schema = @"...";
using (MemoryStream memStream = new MemoryStream(eventHubMessage.GetBytes()))
{
memStream.Seek(0, SeekOrigin.Begin);
Schema writerSchema = Schema.Parse(schema);
Avro.Specific.SpecificDatumReader<MyModel> r = new Avro.Specific.SpecificDatumReader<MyModel>(writerSchema, writerSchema);
output = r.Read(null, new Avro.IO.BinaryDecoder(memStream));
}
Класс модели должен реализовывать интерфейс ISpecificRecord следующим образом:
[DataContract]
public class MyModel: ISpecificRecord
{
[DataMember]
public string Id;
[DataMember]
public enumP Type;
[DataMember]
public long Timestamp;
public Dictionary<string, string> Context;
public static Schema _SCHEMA = Avro.Schema.Parse(@"...");
public virtual Schema Schema
{
get
{
return Position._SCHEMA;
}
}
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.Id;
case 1: return this.Timestamp;
case 2: return this.Type;
case 3: return this.Context;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.Id = (System.String)fieldValue; break;
case 1: this.Timestamp = (System.Int64)fieldValue; break;
case 2: this.Type = (enumP)fieldValue; break;
case 3: this.Context = (Dictionary<string,string>)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
[DataContract]
public enum enumP
{
ONE, TWO, THREE
}
Имена свойств в классе MyModel должны бытьто же самое в используемой схеме.