Реестр схем Hortonworks + Nifi + Java: десериализация записи Nifi - PullRequest
2 голосов
/ 19 февраля 2020

Я пытаюсь десериализовать некоторые сообщения Kafka, которые были сериализованы Nifi, с использованием реестра схем Hortonworks

  • Процессор, используемый на стороне Nifi в качестве RecordWritter: AvroRecordSetWriter
  • Стратегия записи схемы: Справочник по схеме с кодированием HWX

Я могу десериализовать эти сообщения другим пользователям Nifi kafka. Однако я пытаюсь десериализовать их из моего приложения Flink с помощью кода Kafka.

У меня есть следующее в обработчике десериализатора Kafka моего приложения Flink:

final String SCHEMA_REGISTRY_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_URL_KEY = SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name();

Properties schemaRegistryProperties = new Properties();
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_SIZE_KEY, 10L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY, 5000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY, 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY, 60 * 60 * 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_URL_KEY, "http://schema_registry_server:7788/api/v1");
return (Map<String, Object>) HWXSchemaRegistry.getInstance(schemaRegistryProperties).deserialize(message);

А вот HWXSchemaRegistryCode для десериализовать сообщение:

import com.hortonworks.registries.schemaregistry.avro.AvroSchemaProvider;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotDeserializer;

public class HWXSchemaRegistry {

    private SchemaRegistryClient client;
    private Map<String,Object> config;
    private AvroSnapshotDeserializer deserializer;
    private static HWXSchemaRegistry hwxSRInstance = null;

    public static HWXSchemaRegistry getInstance(Properties schemaRegistryConfig) {
        if(hwxSRInstance == null)
            hwxSRInstance = new HWXSchemaRegistry(schemaRegistryConfig);
        return hwxSRInstance;
    }

    public Object deserialize(byte[] message) throws IOException {

        Object o = hwxSRInstance.deserializer.deserialize(new ByteArrayInputStream(message), null);
        return o;
   }

    private static Map<String,Object> properties2Map(Properties config) {
        Enumeration<Object> keys = config.keys();
        Map<String, Object> configMap = new HashMap<String,Object>();
        while (keys.hasMoreElements()) {
            Object key = (Object) keys.nextElement();
            configMap.put(key.toString(), config.get(key));
        }
        return configMap;
     }

    private HWXSchemaRegistry(Properties schemaRegistryConfig) {
        _log.debug("Init SchemaRegistry Client");
        this.config = HWXSchemaRegistry.properties2Map(schemaRegistryConfig);
        this.client = new SchemaRegistryClient(this.config);

        this.deserializer = this.client.getDefaultDeserializer(AvroSchemaProvider.TYPE);
        this.deserializer.init(this.config);
     }
}

Но я получаю код ошибки 404 HTTP (схема не найдена). Я думаю, это связано с несовместимыми «протоколами» между конфигурацией Nifi и реализацией клиента реестра схемы HWX, поэтому байты идентификатора схемы, которые ищет клиент, не существуют на сервере, или что-то в этом роде.

Может кто-то помощь по этому вопросу?

Спасибо.

Вызвано: javax.ws.rs.NotFoundException: HTTP 404 не найден на org.glassfi sh .jersey.client.JerseyInvocation .convertToException (JerseyInvocation. java: 1069) в org.glassfi sh .jersey.client.JerseyInvocation.translate (JerseyInvocation. java: 866) в org.glassfi sh .jersey.client.JerseyInvocation $ invoke $ 1 (JerseyInvocation. java: 750) в org.glassfi sh .jersey.internal.Errors.process (Ошибки. java: 292) в org.glassfi sh .jersey.internal.Errors. process (Ошибки. java: 274) в org.glassfi sh .jersey.internal.Errors.process (Ошибки. java: 205) в org.glassfi sh .jersey.process.internal.RequestScope. runInScope (RequestScope. java: 390) по адресу org.glassfi sh .jersey.c lient.JerseyInvocation.invoke (JerseyInvocation. java: 748) в org.glassfi sh .jersey.client.JerseyInvocation $ Builder.method (JerseyInvocation. java: 404) в org.glassfi sh .jsey. client.JerseyInvocation $ Builder.get (JerseyInvocation. java: 300) на com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient $ 14.run (SchemaRegistryClient. java: 1054) на com.hortonworks.registriesche.sc .SchemaRegistryClient $ 14.run (SchemaRegistryClient. java: 1051) в java .security.AccessController.doPrivileged (собственный метод) в javax.security.auth.Subject.doAs (Subject. java: 360) в com. hortonworks.registries. schemaregistry.client.SchemaRegistryClient.getAllVersions (SchemaRegistryClient. java: 676) в HWXSchemaRegistry. (HWXSchemaRegistry. * 10 53 *: 56) в HWXSchemaRegistry.getInstance (HWXSchemaRegistry. java: 26) в SchemaService.deserialize (SchemaService. java: 70) в SchemaService.deserialize (SchemaService. java: 26) в орг. * 1057 .flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper. 1061 *: 140) в орг. apache .flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run (FlinkKafkaConsumerBase. java: 712) в орг. apache .flink.streaming.api.operators.StreamSource.run (StreamSource. java: 93) в орг. apache .flink.streaming.api.operators.StreamSource.run (StreamSource. java: 57) в орг. apache .flink.streaming.runtime.tasks .SourceStreamTask.run (SourceStreamTask. java: 97) в орг. apache .flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask. java: 302) в орг. apache .flink.runtime .taskmanager.Task.run (. Задача java 711) в java .lang.Thread.run (поток. java: 745)

1 Ответ

0 голосов
/ 21 февраля 2020

Я нашел обходной путь. Так как я не смог заставить это работать. Я беру первые байты массива байтов, чтобы сделать несколько обращений к реестру схемы, и получаю схему avro для последующей десериализации остальной части массива байтов.

  • Первый байт (0) - версия протокола (I понял, что это Nifi-специфицированный c байт, поскольку он мне не нужен).
  • Следующие 8 байтов - это идентификатор схемы
  • Следующие 4 байта - это версия схемы
  • Остальные байты - это само сообщение:

    import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
    import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
    import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
    import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
    
    try(SchemaRegistryClient client = new SchemaRegistryClient(this.schemaRegistryConfig)) {
        try {
            Long schemaId = ByteBuffer.wrap(Arrays.copyOfRange(message, 1, 9)).getLong();
            Integer schemaVersion =  ByteBuffer.wrap(Arrays.copyOfRange(message, 9, 13)).getInt();
    
    
            SchemaMetadataInfo schemaInfo = client.getSchemaMetadataInfo(schemaId);
            String schemaName = schemaInfo.getSchemaMetadata().getName();
    
            SchemaVersionInfo schemaVersionInfo = client.getSchemaVersionInfo(
                    new SchemaVersionKey(schemaName, schemaVersion));   
    
    
            String avroSchema = schemaVersionInfo.getSchemaText();
            byte[] message= Arrays.copyOfRange(message, 13, message.length);
            // Deserialize [...]
        } 
        catch (Exception e) 
        {
            throw new IOException(e.getMessage());
        }
    }
    

Я также подумал, что, возможно, мне пришлось удалить первый байт перед вызовом hwxSRInstance.deserializer.deserialize в моем коде вопроса, поскольку этот байт, кажется, является специфицированным c байтом Nifi для связи между процессорами Nifi, но это не сработало.

Следующим шагом является создание кеша с текстами схемы, чтобы избежать многократный вызов API реестра схемы.

Новая информация: я расширю свой ответ, включив в него часть по десериализации avro, так как для меня это было несколько проблем, и мне пришлось проверить исходный код Nifi Avro Reader на рисунке o Эта часть (я получаю недопустимое исключение данных Avro при попытке использовать базовый код десериализации avro c):

import org.apache.avro.Schema;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;

private static GenericRecord deserializeMessage(byte[] message, String schemaText) throws IOException {

    InputStream in = new SeekableByteArrayInput(message);
    Schema schema = new Schema.Parser().parse(schemaText);
    DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in,  null);
    GenericRecord genericRecord = null;
    genericRecord = datumReader.read(genericRecord, decoder);
    in.close();

    return genericRecord;
}

Если вы хотите преобразовать GenericRecord в карту, обратите внимание, что строковые значения не являются Строковые объекты, вам нужно привести ключи и значения типов string:

private static Map<String, Object> avroGenericRecordToMap(GenericRecord record)
{
    Map<String, Object> map = new HashMap<>();
    record.getSchema().getFields().forEach(field -> 
        map.put(String.valueOf(field.name()), record.get(field.name())));

    // Strings are maped to Utf8 class, so they need to be casted (all the keys of records and those values which are typed as string)
    if(map.get("value").getClass() ==  org.apache.avro.util.Utf8.class)
        map.put("value", String.valueOf(map.get("value")));

    return map;
}
...