Сбой KafkaAvroDeserializer с исключением из Kyro - PullRequest
1 голос
/ 23 января 2020

Я написал потребителю читать обобщенную c запись Avro с использованием реестра схемы.

FlinkKafkaConsumer010 kafkaConsumer010 = new FlinkKafkaConsumer010(KAFKA_TOPICS,
                new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
                properties);

И класс десериализации выглядит так:

public class KafkaGenericAvroDeserializationSchema implements KeyedDeserializationSchema<GenericRecord> {

   private final String registryUrl;
    private transient KafkaAvroDeserializer inner;

    public KafkaGenericAvroDeserializationSchema(String registryUrl) {
        this.registryUrl = registryUrl;
    }

    @Override
    public GenericRecord deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {
        checkInitialized();
        return (GenericRecord) inner.deserialize(topic, message);
    }

    @Override
    public boolean isEndOfStream(GenericRecord nextElement) {
        return false;
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeExtractor.getForClass(GenericRecord.class);
    }

    private void checkInitialized() {
        if (inner == null) {
            Map<String, Object> props = new HashMap<>();
            props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
            props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
            SchemaRegistryClient client =
                    new CachedSchemaRegistryClient(
                            registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
            inner = new KafkaAvroDeserializer(client, props);
        }
    }
}

Он работает локально на моей машине, но когда я развернул ее в кластере пряжи, я получаю исключение ниже:

java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)

Caused by: com.esotericsoftware.kryo.KryoException: Error constructing instance of class: org.apache.avro.Schema$LockableArrayList
Serialization trace:
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)

Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.Instantiators$$anonfun$normalJava$1 can not access a member of class org.apache.avro.Schema$LockableArrayList with modifiers "public"

Пожалуйста, помогите мне решить эту проблему.

1 Ответ

0 голосов
/ 23 января 2020

Как написано в списке рассылки :

Проблема в том, что вы не предоставляете какой-либо значимой информации о типе, поэтому Флинк вынужден прибегнуть к Kryo. Вам нужно извлечь схему во время компиляции запроса (в вашем основном) и передать ее в вашу схему десериализации.

public TypeInformation<T> getProducedType() {
      return (TypeInformation<T>) new GenericRecordAvroTypeInfo(this.schema);
}

Если вы не хотите извлекать ее статически, вам нужно сказать Flink, как обрабатывать произвольные GenericRecords. Вы можете реализовать свой собственный сериализатор , который будет записывать GenericRecords в byte [] и наоборот.

Обратите внимание, что я все же рекомендую просто связать схему с вашим приложением Flink, а не изобретать велосипед .

...