Редактировать: я обнаружил этот другой вопрос несколько лет назад ( Как заполнить кэш в CachedSchemaRegistryClient, не совершая вызова для регистрации новой схемы? ).В нем упоминается, что CachedSchemaRegistryClient необходимо зарегистрировать схему в фактическом реестре, чтобы сделать ее кэшированной, и пока не было найдено решения для обхода этой проблемы.Итак, оставив мой вопрос здесь, но хотел, чтобы об этом тоже знали.
Я работаю над программой, которая извлекает байтовый массив из kafka, расшифровывает его (чтобы он был безопасен при работе с kafka), конвертируетбайты в строку, строка json в объект json, поиск схемы из реестра схемы (с использованием CachedSchemaRegistryClient), преобразование байтов json в общую запись с использованием схемы из извлеченной схемы из метаданных реестра, а затем сериализация этогообщая запись в байты avro.
После выполнения некоторых тестов создается впечатление, что CachedSchemaRegistyClient является основным фактором снижения производительности.Но из того, что я могу сказать, это лучший способ получить метаданные схемы.Я что-то плохо реализовал или есть другой способ сделать это, который работает с моим вариантом использования?
Вот код для того, что обрабатывает все после расшифровки:
package org.apache.flink;
import avro.fullNested.FinalMessage;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import serializers.AvroFinishedMessageSerializer;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
public class JsonToAvroBytesParser implements FlatMapFunction<String, byte[]> {
private transient CachedSchemaRegistryClient schemaRegistryClient;
private transient AvroFinishedMessageSerializer avroFinishedMessageSerializer;
private String schemaUrl;
private Integer identityMaxCount;
public JsonToAvroBytesParser(String passedSchemaUrl, int passedImc){
schemaUrl = passedSchemaUrl;
identityMaxCount = passedImc;
}
private void ensureInitialized() {
if (schemaUrl.equals("")) {
schemaUrl = "https://myschemaurl.com/";
}
if(identityMaxCount == null){
identityMaxCount = 5;
}
if(schemaRegistryClient == null){
schemaRegistryClient = new CachedSchemaRegistryClient(schemaUrl, identityMaxCount);
}
if(avroFinalMessageSerializer == null){
avroFinalMessageSerializer = new AvroFinalMessageSerializer(FinalMessage.class);
}
}
@Override
public void flatMap(String s, Collector<byte[]> collector) throws Exception {
ensureInitialized();
Object obj = new JSONParser().parse(s);
JSONObject jsonObject = (JSONObject) obj;
try {
String headers = jsonObject.get("headers").toString();
JSONObject body = (JSONObject) jsonObject.get("requestBody");
if(headers != null && body != null){
String kafkaTopicFromHeaders = "hard_coded_name-value";
//NOTE: this schema lookup has serious performance issues.
SchemaMetadata schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(kafkaTopicFromHeaders);
//TODO: need to implement recovery method if schema cannot be reached.
JsonAvroConverter converter = new JsonAvroConverter();
GenericRecord specificRecord = converter.convertToGenericDataRecord(body.toJSONString().getBytes(), new Schema.Parser().parse(schemaMetadata.getSchema()));
byte[] bytesToReturn = avroFinishedMessageSerializer.serializeWithSchemaId(schemaMetadata, specificRecord);
collector.collect(bytesToReturn);
}
else {
System.out.println("json is incorrect.");
}
} catch (Exception e){
System.out.println("json conversion exception caught");
}
}
}
Спасибо за любую помощь заранее!