У меня есть ошибка во Flink, которую я не понимаю.
Я создал пользовательский StreamTableSource
, который выглядит следующим образом:
public class MyCustomTableSource implements StreamTableSource<Row> {
/** The schema of the table. */
private final InitConfig initialisationConfig;
private final StreamTableSource updateSource;
public SdpUpdatedTableSource(InitConfig initialisationConfig, StreamTableSource updateSource) {
this.initialisationConfig = initialisationConfig;
this.updateSource = updateSource;
}
@Override
public DataStream<Row> getDataStream(final StreamExecutionEnvironment env) {
String initializationEndpoint = initialisationConfig.get("endpoint");
String[] fieldsName = updateSource.getTableSchema().getFieldNames();
DataStream<Row> stream = updateSource.getDataStream(env)
.map(RowToMapObject(fieldsName)) // Parse the row into a Map<String, Object> object
.map(MyRichMapFunction(initializationEndpoint)) // Rich Map Function that initialize some data in the open() function using initializationEndpoint
// And fuse the stored data & update data in the map. Output is a Map<String,Object> with same structure as the one in input
.map(m->(Row)m); // Parse back the Map into a Row
return stream;
}
@Override
public TypeInformation<Row> getReturnType() {
return updateSource.getReturnType();
}
@Override
public TableSchema getTableSchema() {
return updateSource.getTableSchema();
}
@Override
public String explainSource() {
return initialisationConfig.getId()+"_"+updateSource.explainSource();
}
Цель этого состоит в том, чтобыDataStream, который извлекает некоторые данные при инициализации и применяет обновления к этим сохраненным данным для отправки в новый поток.
Но когда я пытаюсь использовать это StreamTableSource
, у меня появляется следующая ошибка:
TableSource of type my.package.MyCustomTableSource returned a DataStream of type GenericType<org.apache.flink.types.Row> that does not match with the type Row(operation: String, data: Row(id: String, name: String, [...})) declared by the TableSource.getReturnType() method. Please validate the implementation of the TableSource.
И я не очень понимаю, почему. У кого-нибудь есть идея?