Flink SQL Поле результата не соответствует запрошенной ошибке типа в LocalDateTime - PullRequest
0 голосов
/ 07 января 2020

Когда я сгруппирую нижнее, выберите его, получая ошибку обработки типа. Я уже пытаюсь CAST как TIMESTAMP и пытаюсь изменить тип POJOs LocalDateTime. Большинство примеров кодов преобразуется, так как Row.class не может найти ни одного примера пользовательского класса.

SELECT name, MIN(price) AS minPrice, MAX(price) AS maxPrice, AVG(price) AS avarage, COUNT(name) as sayi, TUMBLE_START(rowtime, INTERVAL '5' SECOND) AS zaman FROM STOCK GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), name

Брошенная ошибка:

Exception in thread "main" org.apache.flink.table.api.TableException: Result field 'zaman' does not match requested type. Requested: GenericType<java.time.LocalDateTime>; Actual: LocalDateTime

Код:

    tableEnvironment.registerDataStream("STOCK", messageStream, "name, price, rowtime.rowtime");

    Table result = tableEnvironment.sqlQuery(
            "SELECT name, MIN(price) AS minPrice, MAX(price) AS maxPrice, AVG(price) AS avarage, COUNT(name) as sayi, TUMBLE_START(rowtime, INTERVAL '5' SECOND) AS zaman FROM STOCK GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), name");

    result.printSchema();

    FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(kp.getProducerProperties().getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
            "STOCKGROUP", new SimpleStringSchema());

    myProducer.setWriteTimestampToKafka(true);

    DataStream<Tuple2<Boolean, StockGroup>> stream = tableEnvironment.toRetractStream(result, StockGroup.class);

    stream.map(x -> x.f1.toString()).addSink(myProducer);

StockGroup.class POJO;

public String name;
public Double minPrice;
public Double maxPrice;
public Double avarage;
public Long sayi;
public LocalDateTime zaman;

Печатная схема;

root
 |-- name: STRING
 |-- minPrice: DOUBLE
 |-- maxPrice: DOUBLE
 |-- avarage: DOUBLE
 |-- sayi: BIGINT NOT NULL
 |-- zaman: TIMESTAMP(3) *ROWTIME*
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...