Я нашел обходной путь. Так как я не смог заставить это работать. Я беру первые байты массива байтов, чтобы сделать несколько обращений к реестру схемы, и получаю схему avro для последующей десериализации остальной части массива байтов.
- Первый байт (0) - версия протокола (I понял, что это Nifi-специфицированный c байт, поскольку он мне не нужен).
- Следующие 8 байтов - это идентификатор схемы
- Следующие 4 байта - это версия схемы
Остальные байты - это само сообщение:
import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
try(SchemaRegistryClient client = new SchemaRegistryClient(this.schemaRegistryConfig)) {
try {
Long schemaId = ByteBuffer.wrap(Arrays.copyOfRange(message, 1, 9)).getLong();
Integer schemaVersion = ByteBuffer.wrap(Arrays.copyOfRange(message, 9, 13)).getInt();
SchemaMetadataInfo schemaInfo = client.getSchemaMetadataInfo(schemaId);
String schemaName = schemaInfo.getSchemaMetadata().getName();
SchemaVersionInfo schemaVersionInfo = client.getSchemaVersionInfo(
new SchemaVersionKey(schemaName, schemaVersion));
String avroSchema = schemaVersionInfo.getSchemaText();
byte[] message= Arrays.copyOfRange(message, 13, message.length);
// Deserialize [...]
}
catch (Exception e)
{
throw new IOException(e.getMessage());
}
}
Я также подумал, что, возможно, мне пришлось удалить первый байт перед вызовом hwxSRInstance.deserializer.deserialize
в моем коде вопроса, поскольку этот байт, кажется, является специфицированным c байтом Nifi для связи между процессорами Nifi, но это не сработало.
Следующим шагом является создание кеша с текстами схемы, чтобы избежать многократный вызов API реестра схемы.
Новая информация: я расширю свой ответ, включив в него часть по десериализации avro, так как для меня это было несколько проблем, и мне пришлось проверить исходный код Nifi Avro Reader на рисунке o Эта часть (я получаю недопустимое исключение данных Avro при попытке использовать базовый код десериализации avro c):
import org.apache.avro.Schema;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
private static GenericRecord deserializeMessage(byte[] message, String schemaText) throws IOException {
InputStream in = new SeekableByteArrayInput(message);
Schema schema = new Schema.Parser().parse(schemaText);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
GenericRecord genericRecord = null;
genericRecord = datumReader.read(genericRecord, decoder);
in.close();
return genericRecord;
}
Если вы хотите преобразовать GenericRecord в карту, обратите внимание, что строковые значения не являются Строковые объекты, вам нужно привести ключи и значения типов string:
private static Map<String, Object> avroGenericRecordToMap(GenericRecord record)
{
Map<String, Object> map = new HashMap<>();
record.getSchema().getFields().forEach(field ->
map.put(String.valueOf(field.name()), record.get(field.name())));
// Strings are maped to Utf8 class, so they need to be casted (all the keys of records and those values which are typed as string)
if(map.get("value").getClass() == org.apache.avro.util.Utf8.class)
map.put("value", String.valueOf(map.get("value")));
return map;
}