Flink Call UDF из пользовательского TimestampExtractor - PullRequest
0 голосов
/ 22 января 2019

У меня есть работа Flink, использующая сообщения Avro от Kafka. Одно поле в Avro - это строка времени String. Я пытаюсь добавить TimestampExtractor в это поле.

final Kafka kafka = new Kafka()
    .version("0.9")
    .topic("my_topic")
    .properties(properties);

tEnv.connect(kafka)
    .withFormat(new Avro().recordClass(clazz))
    .withSchema(schema.getSchema())
    .inAppendMode()
    .registerTableSource("my_table");

При построении схемы я добавляю rowtime к своему полю отметки времени.

// the timestamp field
schema.field("rowtime", Types.SQL_TIMESTAMP)
    .rowtime(new Rowtime()
        .timestampsFromField(timestampField)
        .watermarksPeriodicBounded(period));

Этот код выдает exception. Мои метки времени отформатированы как «2019-01-21 02:06:14 +0000», что, по-видимому, недопустимо при передаче формата синтаксического анализа.

java.lang.NumberFormatException: For input string: "14 +0000"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at org.apache.calcite.avatica.util.DateTimeUtils.timeStringToUnixDate(DateTimeUtils.java:668)
    at org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate(DateTimeUtils.java:713)
    at DataStreamSourceConversion$9.processElement(Unknown Source)
    at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)
    at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:712)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:690)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:362)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.emitRecord(Kafka09Fetcher.java:195)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

Поэтому я пытаюсь создать пользовательский TimestampExtractor, однако я сталкиваюсь с исключениями независимо от того, что я пытаюсь.

// adding the custom extractor
schema.field("rowtime", Types.SQL_TIMESTAMP)
    .rowtime(new Rowtime()
        .timestampsFromExtractor(new StringTimestampExtractor(timestampField))
        .watermarksPeriodicBounded(period));
public class StringTimestampExtractor extends TimestampExtractor {
    public final String field;

    public StringTimestampExtractor() {
        this.field = null;
    }

    public StringTimestampExtractor(final String field) {
        this.field = field;
    }

    @Override
    public String[] getArgumentFields() {
        return new String[]{field};
    }

    @Override
    public void validateArgumentFields(TypeInformation<?>[] typeInformations) throws ValidationException {
        final TypeInformation type =  typeInformations[0];

        if (type != Types.STRING) {
            throw new ValidationException(
                    String.format("StringTimestampExtractor requires type String but was type %s", type.toString()));
        }
    }

   @Override
    public Expression getExpression(ResolvedFieldReference[] resolvedFieldReferences) {
        final ResolvedFieldReference fieldAccess = resolvedFieldReferences[0];

        // Trying to call a UDF throws an exception even though its registered:
        return ExpressionParser.parseExpression(String.format("PING_DATEGMTISO_PARSER(%s)", fieldAccess.name()));
   }
}

Исключение:

Caused by: org.apache.flink.table.api.UnresolvedException: trying to convert UnresolvedFunction PING_DATEGMTISO_PARSER to RexNode
    at org.apache.flink.table.expressions.Call.toRexNode(call.scala:49)
    at org.apache.flink.table.expressions.Cast.toRexNode(cast.scala:33)
    at org.apache.flink.table.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:383)
    at org.apache.flink.table.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:368)
    at scala.Option.map(Option.scala:146)
    at org.apache.flink.table.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:368)
    at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:107)
    at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
    at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate.translateToPlan(DataStreamGroupWindowAggregate.scala:121)
    at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:999)
    at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:926)
    at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:393)
    at com.jwplayer.flink.connectors.TableFactory.createSink(TableFactory.java:43)
    at com.jwplayer.flink.jobs.table.JWTableSinkJob.createTableSink(JWTableSinkJob.java:28)
    at com.jwplayer.flink.jobs.BaseTableJob.runJob(BaseTableJob.java:170)
    at com.jwplayer.flink.jobs.entrypoints.SqlJobEntryPoint.main(SqlJobEntryPoint.java:14)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    ... 9 more

Я тоже пытался взломать что-то подобное, но у меня тоже есть исключение:


    @Override
    public Expression getExpression(ResolvedFieldReference[] resolvedFieldReferences) {
        final ResolvedFieldReference fieldAccess = resolvedFieldReferences[0];
        return new Cast(new Cast(new Substring(fieldAccess, new CharLength(new CurrentTime())), SqlTimeTypeInfo.TIMESTAMP), Types.LONG);
    }

Исключение:

Caused by: org.apache.flink.table.codegen.CodeGenException: Unsupported call: CHAR_LENGTH 
If you think this function should be supported, you can create an issue and start a discussion for it.
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1025)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1025)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1025)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
    at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:757)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:747)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:747)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
    at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:757)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:747)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:747)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
    at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:757)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:747)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:747)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
    at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:757)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:747)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:747)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
    at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
    at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:248)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$3.apply(CodeGenerator.scala:274)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$3.apply(CodeGenerator.scala:270)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234)
    at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:270)
    at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.generateConversionProcessFunction(StreamScan.scala:111)
    at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:74)
    at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.convertToInternalRow(StreamTableSourceScan.scala:39)
    at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:116)
    at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
    at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate.translateToPlan(DataStreamGroupWindowAggregate.scala:121)
    at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:999)
    at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:926)
    at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:393)

Я использую Flink 1.6.2 в контейнере Docker. К сожалению, я не могу изменить формат String timestamp моих входных данных. Я мог бы перенаправить этот DataStream в другой и добавить дополнительный столбец в SQL, который выполняет мой UDF, но я предпочитаю не обходить эту проблему.

Любое руководство о том, как я могу выполнить UDF в TimestampExtractor, будет высоко ценится.

...