У меня есть два потока Kafka: чтение с использованием API источника таблицы во Flink с использованием scala, соединение с окном и загрузка данных в приемник таблицы Cassandra. Но при этом я столкнулся с проблемой в части вставки данных в Cassandra. Пожалуйста, найдите полную ошибку ниже. Насколько я понял, проблема в том, когда он пытается преобразовать поток таблицы в поток данных. Для решения этой проблемы требуется справка.
Мое требование - сопоставить два потока kafka с отметкой времени источника из сообщения. Я думаю, что метод, который я использовал, был осмысленным. Подскажите пожалуйста, есть ли лучший способ сделать это, предложите. Ищу помощь, поскольку я новичок в кодировании Flink.
Это мой код:
import org.apache.flink.table.api.EnvironmentSettings;
val blinkplaner = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
val stenv = StreamTableEnvironment.create(senv, blinkplaner)
import java.util.{Optional, Properties}
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.Cluster.Builder
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.cassandra._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.api.java.ClosureCleaner.clean
import com.datastax.driver.core.PlainTextAuthProvider
import com.datastax.driver.core.Cluster;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.flink.api.scala._;
import org.apache.flink.table.api._;
import org.apache.flink.table.api.scala._;
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
val orderSchema = new Schema().field("order_num", Types.STRING()).field("status", Types.STRING()).field("ts", Types.SQL_TIMESTAMP()).rowtime(new Rowtime().timestampsFromSource.watermarksPeriodicBounded(1000));
stenv.connect(
new Kafka()
.version("0.11")
.topic("Stream1")
.startFromLatest()
.property("bootstrap.servers","Bootstrap.Servers")
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(orderSchema)
.inAppendMode()
.registerTableSource("Stream1_Events");
stenv.connect(
new Kafka()
.version("0.11")
.topic("Stream2")
.startFromLatest()
.property("bootstrap.servers","Bootstrap.Servers")
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(orderSchema)
.inAppendMode()
.registerTableSource("Stream2_Events");
val Sevent1: Table = stenv.scan("Stream1_Events");
val Sevent2: Table = stenv.scan("Stream2_Events").as('order_num1, 'status1, 'ts1);
val joinResult = Sevent1.join(Sevent2).where('order_num === 'order_num1 && 'ts <= 'ts1 - 5.minutes && 'ts > 'ts1 + 10.minutes).select('order_num, 'status, 'ts);
val Cbuilder: ClusterBuilder = new ClusterBuilder {override def buildCluster(builder: Cluster.Builder) = builder.addContactPoint("HostIP").withPort(PORT).withAuthProvider(new PlainTextAuthProvider("cassandra", "cassandra")).build()};
val sink: CassandraAppendTableSink = new CassandraAppendTableSink(Cbuilder,"INSERT INTO dm.order_join (order_num, status, ts) VALUES (?, ?, ?)");
stenv.registerTableSink("cassandraOutputTable",Array[String]("order_num","status","ts"),Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.SQL_TIMESTAMP),sink);
joinResult.insertInto("cassandraOutputTable");
senv.execute("Kafka Stream Table");
`
Также найдите детали исключения:
java.lang.UnsupportedOperationException
at org.apache.flink.table.planner.expressions.RexNodeConverter.visit(RexNodeConverter.java:949)
at org.apache.flink.table.planner.expressions.RexNodeConverter.visit(RexNodeConverter.java:889)
at org.apache.flink.table.planner.expressions.RexNodeConverter.visit(RexNodeConverter.java:127)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:135)
at org.apache.flink.table.planner.expressions.RexNodeConverter.convertReinterpretCast(RexNodeConverter.java:652)
at org.apache.flink.table.planner.expressions.RexNodeConverter.lambda$new$1(RexNodeConverter.java:140)
at org.apache.flink.table.planner.expressions.RexNodeConverter$RexNodeConversion.convert(RexNodeConverter.java:1040)
at org.apache.flink.table.planner.expressions.RexNodeConverter.visit(RexNodeConverter.java:942)
at org.apache.flink.table.planner.expressions.RexNodeConverter.visit(RexNodeConverter.java:889)
at org.apache.flink.table.planner.expressions.RexNodeConverter.visit(RexNodeConverter.java:127)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:135)
at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:293)
at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:270)
at scala.Option.map(Option.scala:146)
at org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:270)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:117)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecWindowJoin.translateToPlanInternal(StreamExecWindowJoin.scala:135)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecWindowJoin.translateToPlanInternal(StreamExecWindowJoin.scala:52)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecWindowJoin.translateToPlan(StreamExecWindowJoin.scala:52)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:133)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:327)
at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
... 30 elided