Amazon Kinesis Data Analytics для Java приложений: ошибка Avro при десериализации входящих сообщений - PullRequest
2 голосов
/ 09 января 2020

Я пытался развернуть свое приложение Flink в AWS Kinesis Data Analytics. Это приложение использует Apache Avro для десериализации / сериализации входящих сообщений. Мое приложение отлично работает на моем локальном компьютере, но при развертывании его на AWS у меня возникает исключение (в журналах CloudWatch): Caused by: java.io.InvalidClassException: org.apache.avro.specific.SpecificRecordBase; local class incompatible: stream classdesc serialVersionUID = 4445917349737100331, local class serialVersionUID = -1463700717714793795

Данные журнала:

{
  "locationInformation": "org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:913)",
  "logger": "org.apache.flink.runtime.taskmanager.Task",
  "message": "Source: Custom Source -> Sink: Unnamed (1/1) (a72ff69f9dc0f9e56d1104ce21456a5d) switched from RUNNING to FAILED.",
  "throwableInformation": [
    "org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate serializer.",
    "\tat org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:160)",
    "\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:380)",
    "\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)",
    "\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:133)",
    "\tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:275)",
    "\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)",
    "\tat java.lang.Thread.run(Thread.java:748)",
    "Caused by: java.io.InvalidClassException: org.apache.avro.specific.SpecificRecordBase; local class incompatible: stream classdesc serialVersionUID = 4445917349737100331, local class serialVersionUID = -1463700717714793795",
    "\tat java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)",
    "\tat java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)",
    "\tat java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)",
    "\tat java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)",
    "\tat java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)",
    "\tat java.io.ObjectInputStream.readClass(ObjectInputStream.java:1716)",
    "\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1556)",
    "\tat java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)",
    "\tat org.apache.flink.formats.avro.typeutils.AvroSerializer.readCurrentLayout(AvroSerializer.java:465)",
    "\tat org.apache.flink.formats.avro.typeutils.AvroSerializer.readObject(AvroSerializer.java:432)",
    "\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)",
    "\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)",
    "\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
    "\tat java.lang.reflect.Method.invoke(Method.java:498)",
    "\tat java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)",
    "\tat java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)",
    "\tat java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)",
    "\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)",
    "\tat java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)",
    "\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)",
    "\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)",
    "\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)",
    "\tat org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)",
    "\tat org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:158)",
    "\t... 6 more"
  ],
  "threadName": "Source: Custom Source -> Sink: Unnamed (1/1)",
  "applicationARN": "arn:aws:kinesisanalytics:us-east-1:829044228870:application/poc-kda",
  "applicationVersionId": "8",
  "messageSchemaVersion": "1",
  "messageType": "INFO"
}

Я использую версии библиотек:

  • Apache Avro - 1.9.1
  • Apache Flink - 1.9.1
  • Библиотека производителя Kinesis - 0.13.1
  • AWS Flink - 1.8

Примечание, та же проблема, если я использую Apache Flink - 1.8, 1.6

KDA Код Flink:

public class KinesisExampleKDA {
   private static final String REGION = "us-east-1";

   public static void main(String[] args) throws Exception {
       Properties consumerConfig = new Properties();
       consumerConfig.put(AWSConfigConstants.AWS_REGION, REGION);
       consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.enableCheckpointing(50000);

       DataStream<EventAttributes> consumerStream = env.addSource(new FlinkKinesisConsumer<>(
               "dev-events", new KinesisSerializer(), consumerConfig));

       consumerStream
               .addSink(getProducer());
       env.execute("kinesis-example");
   }

   private static FlinkKinesisProducer<EventAttributes> getProducer(){
       Properties outputProperties = new Properties();
       outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, REGION);
       outputProperties.setProperty("AggregationEnabled", "false");

       FlinkKinesisProducer<EventAttributes> sink = new FlinkKinesisProducer<>(new KinesisSerializer(), outputProperties);
       sink.setDefaultStream("dev-result");
       sink.setDefaultPartition("0");
       return sink;
   }
}

class KinesisSerializer implements DeserializationSchema<EventAttributes>, SerializationSchema<EventAttributes> {
   @Override
   public EventAttributes deserialize(byte[] bytes) throws IOException {
       return EventAttributes.fromByteBuffer(ByteBuffer.wrap(bytes));
   }

   @Override
   public boolean isEndOfStream(EventAttributes eventAttributes) {
       return false;
   }

   @Override
   public byte[] serialize(EventAttributes eventAttributes) {
       try {
           return eventAttributes.toByteBuffer().array();
       } catch (IOException e) {
           e.printStackTrace();
       }
       return new byte[1];
   }

   @Override
   public TypeInformation<EventAttributes> getProducedType() {
       return TypeInformation.of(EventAttributes.class);
   }
}

Код производителя Kinesis:

public class KinesisProducer {

   private static String streamName = "dev-events";

   public static void main(String[] args) throws InterruptedException, JsonMappingException {

       AmazonKinesis kinesisClient = getAmazonKinesisClient("us-east-1");

       try {
           sendData(kinesisClient, streamName);
       } catch (IOException e) {
           e.printStackTrace();
       }
   }

   private static AmazonKinesis getAmazonKinesisClient(String regionName) {

       AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
       clientBuilder.setEndpointConfiguration(
               new AwsClientBuilder.EndpointConfiguration("kinesis.us-east-1.amazonaws.com",
                       regionName));
       clientBuilder.withCredentials(DefaultAWSCredentialsProviderChain.getInstance());
       clientBuilder.setClientConfiguration(new ClientConfiguration());

       return clientBuilder.build();
   }

   private static void sendData(AmazonKinesis kinesisClient, String streamName) throws IOException {

       PutRecordsRequest putRecordsRequest = new PutRecordsRequest();

       putRecordsRequest.setStreamName(streamName);
       List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
       for (int i = 0; i < 50; i++) {
           PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
           EventAttributes eventAttributes = EventAttributes.newBuilder().setName("Jon.Doe").build();
           putRecordsRequestEntry.setData(eventAttributes.toByteBuffer());
           putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
           putRecordsRequestEntryList.add(putRecordsRequestEntry);
       }

       putRecordsRequest.setRecords(putRecordsRequestEntryList);
       PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest);
       System.out.println("Put Result" + putRecordsResult);
   }

Схема Avro в формате .avdl:

@version("0.1.0")
@namespace("com.naya.avro")
protocol UBXEventProtocol{

 record EventAttributes{
               union{null, string} name=null;
 }
}

Класс сгенерированной сущности Avro:

@org.apache.avro.specific.AvroGenerated
public class EventAttributes extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = 2780976157169751219L;
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"EventAttributes\",\"namespace\":\"com.naya.avro\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

  private static SpecificData MODEL$ = new SpecificData();

  private static final BinaryMessageEncoder<EventAttributes> ENCODER =
      new BinaryMessageEncoder<EventAttributes>(MODEL$, SCHEMA$);

  private static final BinaryMessageDecoder<EventAttributes> DECODER =
      new BinaryMessageDecoder<EventAttributes>(MODEL$, SCHEMA$);
…

Ссылки Github:

Не могли бы вы добавить более подробную информацию по этому вопросу? Почему он не работает на AWS?

Заранее спасибо

Ответы [ 2 ]

2 голосов
/ 17 марта 2020

Мы решили эту проблему. В приложении мы использовали Avro 1.9.1, но AWS KDA использует Avro 1.8.1. Понижение с 1.9.1 до 1.8.1 решило эту проблему.

2 голосов
/ 10 февраля 2020

Глядя на трассировку стека, похоже, что это происходит не при попытке прочитать сообщение, а на самом деле на этапе инициализации самого оператора.

Принцип работы Flink - он сериализует (используя сериализацию Java) каждый оператор, который необходимо выполнить, а затем распределяет их в сериализованном виде по кластеру. Это означает, что KinesisSerializer будет сериализован сам (как класс) для отправки по проводам.

Теперь проблема в том, что сериализатор Kinesis ссылается на модель EventAttributes, что означает, что ссылка на EventAttributes (сам класс, не указано c экземпляр) будет сериализовано с ним. И как часть сериализованных метаданных это то, что ожидается расширить / реализовать. В вашем случае требуется SpecificRecordBase, который не является частью вашего распространяемого, но является частью библиотеки Avro.

Таким образом, полная цепочка сериализации для самого оператора: KinesisConsumer -> KinesisSerializer -> EventAttributes -> SpecificRecordBase (часть Avro lib).

Однако AWS использует Flink 1.8, который использует Avro 1.8.2, и все базовые классы avro также относятся к 1.8.2. Вы компилируете свое приложение и связываете его с avro binaries 1.9. Поэтому, когда Flink пытается сериализовать ваших операторов и отправить их в кластер - он сериализует ссылку на SpecificRecordBase версии 1.9. Но когда Flink фактически пытается десериализовать его - он видит, что версия не соответствует классу, который он фактически имеет в наличии (1.8.2), и соединение не удается.

У вас есть 2 варианта здесь:

  1. Не используйте KDA. Вместо go для EMR (в комплект поставки 1.9.1 входит январь 2020 г.) или для автономного Flink (потребуется развернуть его вручную на EMR или barebone).
  2. Напишите свое приложение полностью с помощью Flink 1.8. Вы упомянули, что «с версией 1.8.2 приложение не компилируется» - попробуйте вместо этого решить эту проблему.
...