Почему я получаю это исключение "Поле таблицы не найдено в типе возврата" - PullRequest
0 голосов
/ 06 июня 2019

Я пытаюсь создать источник таблицы потоков с некоторыми фиктивными данными.

Когда дело доходит до регистрации его в виде таблицы. Я получаю исключение

org.apache.flink.table.api.ValidationException: поле таблицы 'ItemID' не найдено в типе возврата Row (f0: String, f1: String) TableSource.

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;


public class ItemsSource implements StreamTableSource<Row> {
@Override
public TypeInformation<Row> getReturnType() {


RowTypeInfo typeInfo = new RowTypeInfo(Types.STRING(),Types.STRING());


    return typeInfo;
}

@Override
public TableSchema getTableSchema() {

    TypeInformation[] typeInfo = new TypeInformation[2];
    typeInfo[0] = Types.STRING();
    typeInfo[1] = Types.STRING();
    return new TableSchema(new String[] {"ItemID", "LineID"}, typeInfo);
}

@Override
public String explainSource() {
    return "Item ID and Line ID";
}

@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {

    DataStream<Row> dummy = execEnv.fromElements("1,1","2,2","3,3").map(new MapFunction<String, Row>() {
        @Override
        public Row map(String s) throws Exception {
            String[] values = s.split(",");
            return Row.of(values[0],values[1]);
        }
    });
    return dummy;
}

}

...