У меня есть работа Flink, использующая сообщения Avro от Kafka. Одно поле в Avro - это строка времени String. Я пытаюсь добавить TimestampExtractor
в это поле.
final Kafka kafka = new Kafka()
.version("0.9")
.topic("my_topic")
.properties(properties);
tEnv.connect(kafka)
.withFormat(new Avro().recordClass(clazz))
.withSchema(schema.getSchema())
.inAppendMode()
.registerTableSource("my_table");
При построении схемы я добавляю rowtime
к своему полю отметки времени.
// the timestamp field
schema.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField(timestampField)
.watermarksPeriodicBounded(period));
Этот код выдает exception
. Мои метки времени отформатированы как «2019-01-21 02:06:14 +0000», что, по-видимому, недопустимо при передаче формата синтаксического анализа.
java.lang.NumberFormatException: For input string: "14 +0000"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.apache.calcite.avatica.util.DateTimeUtils.timeStringToUnixDate(DateTimeUtils.java:668)
at org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate(DateTimeUtils.java:713)
at DataStreamSourceConversion$9.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:712)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:690)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:362)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.emitRecord(Kafka09Fetcher.java:195)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Поэтому я пытаюсь создать пользовательский TimestampExtractor
, однако я сталкиваюсь с исключениями независимо от того, что я пытаюсь.
// adding the custom extractor
schema.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromExtractor(new StringTimestampExtractor(timestampField))
.watermarksPeriodicBounded(period));
public class StringTimestampExtractor extends TimestampExtractor {
public final String field;
public StringTimestampExtractor() {
this.field = null;
}
public StringTimestampExtractor(final String field) {
this.field = field;
}
@Override
public String[] getArgumentFields() {
return new String[]{field};
}
@Override
public void validateArgumentFields(TypeInformation<?>[] typeInformations) throws ValidationException {
final TypeInformation type = typeInformations[0];
if (type != Types.STRING) {
throw new ValidationException(
String.format("StringTimestampExtractor requires type String but was type %s", type.toString()));
}
}
@Override
public Expression getExpression(ResolvedFieldReference[] resolvedFieldReferences) {
final ResolvedFieldReference fieldAccess = resolvedFieldReferences[0];
// Trying to call a UDF throws an exception even though its registered:
return ExpressionParser.parseExpression(String.format("PING_DATEGMTISO_PARSER(%s)", fieldAccess.name()));
}
}
Исключение:
Caused by: org.apache.flink.table.api.UnresolvedException: trying to convert UnresolvedFunction PING_DATEGMTISO_PARSER to RexNode
at org.apache.flink.table.expressions.Call.toRexNode(call.scala:49)
at org.apache.flink.table.expressions.Cast.toRexNode(cast.scala:33)
at org.apache.flink.table.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:383)
at org.apache.flink.table.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:368)
at scala.Option.map(Option.scala:146)
at org.apache.flink.table.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:368)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:107)
at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate.translateToPlan(DataStreamGroupWindowAggregate.scala:121)
at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:999)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:926)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:393)
at com.jwplayer.flink.connectors.TableFactory.createSink(TableFactory.java:43)
at com.jwplayer.flink.jobs.table.JWTableSinkJob.createTableSink(JWTableSinkJob.java:28)
at com.jwplayer.flink.jobs.BaseTableJob.runJob(BaseTableJob.java:170)
at com.jwplayer.flink.jobs.entrypoints.SqlJobEntryPoint.main(SqlJobEntryPoint.java:14)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 9 more
Я тоже пытался взломать что-то подобное, но у меня тоже есть исключение:
@Override
public Expression getExpression(ResolvedFieldReference[] resolvedFieldReferences) {
final ResolvedFieldReference fieldAccess = resolvedFieldReferences[0];
return new Cast(new Cast(new Substring(fieldAccess, new CharLength(new CurrentTime())), SqlTimeTypeInfo.TIMESTAMP), Types.LONG);
}
Исключение:
Caused by: org.apache.flink.table.codegen.CodeGenException: Unsupported call: CHAR_LENGTH
If you think this function should be supported, you can create an issue and start a discussion for it.
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1025)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1025)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1025)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:757)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:747)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:747)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:757)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:747)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:747)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:757)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:747)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:747)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:757)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:747)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:747)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:248)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$3.apply(CodeGenerator.scala:274)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$3.apply(CodeGenerator.scala:270)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234)
at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:270)
at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.generateConversionProcessFunction(StreamScan.scala:111)
at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:74)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.convertToInternalRow(StreamTableSourceScan.scala:39)
at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:116)
at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate.translateToPlan(DataStreamGroupWindowAggregate.scala:121)
at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:999)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:926)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:393)
Я использую Flink 1.6.2 в контейнере Docker. К сожалению, я не могу изменить формат String timestamp моих входных данных. Я мог бы перенаправить этот DataStream в другой и добавить дополнительный столбец в SQL, который выполняет мой UDF, но я предпочитаю не обходить эту проблему.
Любое руководство о том, как я могу выполнить UDF в TimestampExtractor
, будет высоко ценится.