Проблема API таблицы Flink: класс не найден TypeDeserializerAdapter - PullRequest
0 голосов
/ 29 января 2020

Я пытаюсь использовать Flink 1.8.2 Table API, собирая fat-jar с maven. Строительный проект успешен. Приложение отлично работает с Stream API. Я могу читать данные из потока Kinesis, могу анализировать json, фильтровать по значениям поля. Я хочу использовать Table API с SQL запросами, поскольку это упрощает обработку данных

mvn clean install

[INFO] BUILD SUCCESS

Но при попытке запустить * он не работает

mvn exe c: java -X

[DEBUG] Setting accessibility to true in order to invoke main() 
INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Flink Kinesis Consumer is going to read the following streams: stream-logs-stage, 
INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a getter for field fields
INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row does not contain a setter for field fields 
INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

[WARNING] java.lang.NoClassDefFoundError: org/apache/flink/api/common/typeutils/TypeDeserializerAdapter
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.typeutils.TypeDeserializerAdapter
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE

Стабильно на String explanation = tableEnv.explain(resultTable); или при попытке сохранить таблицу в файл CSV.

Пример кода:

    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

    //Convert from DataStream to Table
    Table nginxLogsTable = tableEnv.fromDataStream(parsedJsonStream, "origin, timestamp, uri, status, source, remote_addr");

    //Register the table for use in SQL queries
    tableEnv.registerTable("nginxLogs", nginxLogsTable);

    //Define a new Dynamic Table as the results of a SQL Query.
    Table resultTable = tableEnv.sqlQuery(""+
            "SELECT logs.origin, " +
            "  COUNT(DISTINCT logs.remote_addr) " +
            "FROM nginxLogs logs " +
            "GROUP BY logs.origin"
    );

    String explanation = tableEnv.explain(resultTable);
    System.out.println(explanation);

Как это исправить? Это не помогло mvn install exec:java -Dexec.classpathScope="test" -X

...