TableSource возвратил DataStream, который не соответствует типу, объявленному методом TableSource.getReturnType () - PullRequest
0 голосов
/ 30 октября 2019

У меня есть ошибка во Flink, которую я не понимаю.

Я создал пользовательский StreamTableSource, который выглядит следующим образом:

public class MyCustomTableSource  implements StreamTableSource<Row> {

    /** The schema of the table. */
    private final InitConfig initialisationConfig;
    private final StreamTableSource updateSource;

    public SdpUpdatedTableSource(InitConfig initialisationConfig, StreamTableSource updateSource) {
        this.initialisationConfig = initialisationConfig;
        this.updateSource = updateSource;
    }

    @Override
    public DataStream<Row> getDataStream(final StreamExecutionEnvironment env) {
        String initializationEndpoint = initialisationConfig.get("endpoint");
        String[] fieldsName = updateSource.getTableSchema().getFieldNames();

        DataStream<Row> stream = updateSource.getDataStream(env)
                .map(RowToMapObject(fieldsName))                    // Parse the row into a Map<String, Object> object
                .map(MyRichMapFunction(initializationEndpoint))     // Rich Map Function that initialize some data in the open() function using initializationEndpoint
                                                                    // And fuse the stored data & update data in the map. Output is a Map<String,Object> with same structure as the one in input
                .map(m->(Row)m);                                    // Parse back the Map into a Row

        return stream;
    }

    @Override
    public TypeInformation<Row> getReturnType() {
        return updateSource.getReturnType();
    }

    @Override
    public TableSchema getTableSchema() {
        return updateSource.getTableSchema();
    }

    @Override
    public String explainSource() {
        return initialisationConfig.getId()+"_"+updateSource.explainSource();
    }

Цель этого состоит в том, чтобыDataStream, который извлекает некоторые данные при инициализации и применяет обновления к этим сохраненным данным для отправки в новый поток.

Но когда я пытаюсь использовать это StreamTableSource, у меня появляется следующая ошибка:

TableSource of type my.package.MyCustomTableSource returned a DataStream of type GenericType<org.apache.flink.types.Row> that does not match with the type Row(operation: String, data: Row(id: String, name: String, [...})) declared by the TableSource.getReturnType() method. Please validate the implementation of the TableSource.

И я не очень понимаю, почему. У кого-нибудь есть идея?

...