Как исправить поиск в реестре кэшированной схемы, вызывающий ужасную производительность - PullRequest
0 голосов
/ 26 июня 2019

Редактировать: я обнаружил этот другой вопрос несколько лет назад ( Как заполнить кэш в CachedSchemaRegistryClient, не совершая вызова для регистрации новой схемы? ).В нем упоминается, что CachedSchemaRegistryClient необходимо зарегистрировать схему в фактическом реестре, чтобы сделать ее кэшированной, и пока не было найдено решения для обхода этой проблемы.Итак, оставив мой вопрос здесь, но хотел, чтобы об этом тоже знали.

Я работаю над программой, которая извлекает байтовый массив из kafka, расшифровывает его (чтобы он был безопасен при работе с kafka), конвертируетбайты в строку, строка json в объект json, поиск схемы из реестра схемы (с использованием CachedSchemaRegistryClient), преобразование байтов json в общую запись с использованием схемы из извлеченной схемы из метаданных реестра, а затем сериализация этогообщая запись в байты avro.

После выполнения некоторых тестов создается впечатление, что CachedSchemaRegistyClient является основным фактором снижения производительности.Но из того, что я могу сказать, это лучший способ получить метаданные схемы.Я что-то плохо реализовал или есть другой способ сделать это, который работает с моим вариантом использования?

Вот код для того, что обрабатывает все после расшифровки:

package org.apache.flink;

import avro.fullNested.FinalMessage;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import serializers.AvroFinishedMessageSerializer;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

public class JsonToAvroBytesParser implements FlatMapFunction<String, byte[]> {

    private transient CachedSchemaRegistryClient schemaRegistryClient;
    private transient AvroFinishedMessageSerializer avroFinishedMessageSerializer;
    private String schemaUrl;
    private Integer identityMaxCount;

    public JsonToAvroBytesParser(String passedSchemaUrl, int passedImc){
        schemaUrl = passedSchemaUrl;
        identityMaxCount = passedImc;
    }

    private void ensureInitialized() {
        if (schemaUrl.equals("")) {
            schemaUrl = "https://myschemaurl.com/";
        }
        if(identityMaxCount == null){
            identityMaxCount = 5;
        }
        if(schemaRegistryClient == null){
            schemaRegistryClient = new CachedSchemaRegistryClient(schemaUrl, identityMaxCount);
        }
        if(avroFinalMessageSerializer == null){
            avroFinalMessageSerializer = new AvroFinalMessageSerializer(FinalMessage.class);
        }
    }

    @Override
    public void flatMap(String s, Collector<byte[]> collector) throws Exception {

        ensureInitialized();

        Object obj = new JSONParser().parse(s);
        JSONObject jsonObject = (JSONObject) obj;

        try {
            String headers = jsonObject.get("headers").toString();
            JSONObject body = (JSONObject) jsonObject.get("requestBody");
            if(headers != null && body != null){
                String kafkaTopicFromHeaders = "hard_coded_name-value";
                //NOTE: this schema lookup has serious performance issues.
                SchemaMetadata schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(kafkaTopicFromHeaders);
                //TODO: need to implement recovery method if schema cannot be reached.

                JsonAvroConverter converter = new JsonAvroConverter();
                GenericRecord specificRecord = converter.convertToGenericDataRecord(body.toJSONString().getBytes(), new Schema.Parser().parse(schemaMetadata.getSchema()));
                byte[] bytesToReturn = avroFinishedMessageSerializer.serializeWithSchemaId(schemaMetadata, specificRecord);

                collector.collect(bytesToReturn);
            }
            else {
                System.out.println("json is incorrect.");
            }
        } catch (Exception e){
            System.out.println("json conversion exception caught");
        }
    }
}

Спасибо за любую помощь заранее!

...