Добавить схему к фрагменту Avro .Net - PullRequest
0 голосов
/ 15 марта 2019

Проблема в следующем.

Шаги:

  1. Приложение преобразует некоторый пользовательский объект в фрагмент avro (байтовый массив);
  2. Этот фрагмент avro отправляется в концентратор событий в объекте EventData;
  3. Концентратор событий запускает функцию Azure, которая получает Mcrosoft.ServiceBus.Messaging.EventData от концентратора событий;
  4. Я могу извлечь тело EventData, и оно содержит фрагмент avro (байтовый массив) точки 1.

Я использую Microsoft.Hadoop.Avro.

У меня есть схема исходного пользовательского объекта (пункт 1), поэтому я попытался создать универсальный считыватель, который читает фрагмент avro, но я получаю следующую ошибку:

Недопустимый контейнер объекта Avro в потоке. Заголовок не может быть распознан.

Похоже, что Microsoft.Hadoop.Avro ​​может управлять только полным файлом avro (заголовок + схема + тело), ​​но не фрагментом avro (тело).

С помощью java avro-tool я могу добавить схему к фрагменту avro. Возможно ли это также в .Net или .Net Core? Как я могу это сделать?

Для простоты в следующем коде я заменил EventData, который поступает из концентратора событий, на связанный файл avro.

using (Stream stream = new FileStream(@"...\trip-real-0-2019-03-14-12-14.avro", FileMode.Open, FileAccess.Read, FileShare.Read))
{
    // create a generic reader for the event hub avro message
    using (var reader = AvroContainer.CreateGenericReader(stream))
    {
        while (reader.MoveNext())
        {
            foreach (dynamic record in reader.Current.Objects)
            {
                //get the body of the event hub message (fragment avro bytes)
                var avroFragmentByeArray = (byte[])(record.Body);

                // try to create a generic reader with the schema.
                // this line throws an exception
                using (var r = AvroContainer.CreateGenericReader(schema, new MemoryStream(avroFragmentByeArray), true, new CodecFactory()))                                    
                {

                }
            }
        }
    }
}

1 Ответ

0 голосов
/ 23 марта 2019

Я нашел, как это сделать.Есть два способа:

  1. использовать avro-tool.jar из C #;
  2. использовать библиотеку Apache Avro (рекомендуется).

1 ° РешениеСначала получите байты в сообщении данных о событии и сохраните его локально.

public List<string> SaveAvroBytesOnFile(EventData eventHubMessage, string functionAppDirectory)
    {
        try
        {                
            string fileName = "avro-bytes.avro";
            List<string> filesToProcess = new List<string>();
            string singleFileNameToSave = fileName;
            filesToProcess.Add(singleFileNameToSave);              
            string path = Path.Combine(functionAppDirectory,"AvroBytesFiles");  
            System.IO.Directory.CreateDirectory(path);              
            File.WriteAllBytes($"{path}{singleFileNameToSave}", eventHubMessage.GetBytes());                
            return filesToProcess;
        }
        catch (Exception ex)
        {
            throw;
        }
    }

Затем вызовите avro-tool.jar из функции azure и перенаправьте вывод в переменную

 Process myProcess = new Process();
 myProcess.StartInfo.UseShellExecute = false;
 myProcess.StartInfo.FileName = @"D:\Program Files\Java\jdk1.8.0_73\bin\java.exe";                   
 // execute avro tools         
 string avroResourcesPath = Path.Combine(functionAppDirectory, "AvroResources");
 // here you must use the file with the bytes saved before and the avroschema file
 myProcess.StartInfo.Arguments = $"-jar {Path.Combine(avroResourcesPath, "avro-tools-1.8.2.jar")} fragtojson --schema-file {Path.Combine(avroResourcesPath, "schemafile.avsc")} {Path.Combine(functionAppDirectory, "AvroBytesFiles", byteFileNames[i])}";
 myProcess.StartInfo.RedirectStandardOutput = true;
 myProcess.Start();
 // print the output to a string 
 string output = myProcess.StandardOutput.ReadToEnd();
 myProcess.WaitForExit();

Avro-tool может десериализовать байты с схемой, отличной от той, которая вам нужна, поэтому вам нужно отобразить модель avro-tool в вашей модели.Этот шаг может потреблять много ресурсов при изменении сложности модели.

AvroToolModel avroToolModel= JsonConvert.DeserializeObject<AvroTool>(output);
// map the avro model in my model
MyMode myModel = new MyModel(avroToolModel);

2 ° Решение

Это рекомендуемое решение.Десериализацию можно выполнить несколькими строками.

string schema = @"...";
using (MemoryStream memStream = new MemoryStream(eventHubMessage.GetBytes()))
{
   memStream.Seek(0, SeekOrigin.Begin);
   Schema writerSchema = Schema.Parse(schema);
   Avro.Specific.SpecificDatumReader<MyModel> r = new Avro.Specific.SpecificDatumReader<MyModel>(writerSchema, writerSchema);
   output = r.Read(null, new Avro.IO.BinaryDecoder(memStream));
}

Класс модели должен реализовывать интерфейс ISpecificRecord следующим образом:

[DataContract]
public class MyModel: ISpecificRecord
{
    [DataMember]
    public string Id;
    [DataMember]
    public enumP Type;
    [DataMember]
    public long Timestamp;
    public Dictionary<string, string> Context;

    public static Schema _SCHEMA = Avro.Schema.Parse(@"...");

    public virtual Schema Schema
    {
        get
        {
            return Position._SCHEMA;
        }
    }

    public object Get(int fieldPos)
    {
        switch (fieldPos)
        {
            case 0: return this.Id;
            case 1: return this.Timestamp;
            case 2: return this.Type;                
            case 3: return this.Context;
            default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
        };
    }

    public void Put(int fieldPos, object fieldValue)
    {
        switch (fieldPos)
        {
            case 0: this.Id = (System.String)fieldValue; break;
            case 1: this.Timestamp = (System.Int64)fieldValue; break;
            case 2: this.Type = (enumP)fieldValue; break;                
            case 3: this.Context = (Dictionary<string,string>)fieldValue; break;
            default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
        };
    }
}

[DataContract]
public enum enumP
{
    ONE, TWO, THREE
}

Имена свойств в классе MyModel должны бытьто же самое в используемой схеме.

...