UnsupportedOperationException при вставке исходных данных объединенной таблицы из 2 потоков kafka во Flink - PullRequest
0 голосов
/ 03 ноября 2019

У меня есть два потока Kafka: чтение с использованием API источника таблицы во Flink с использованием scala, соединение с окном и загрузка данных в приемник таблицы Cassandra. Но при этом я столкнулся с проблемой в части вставки данных в Cassandra. Пожалуйста, найдите полную ошибку ниже. Насколько я понял, проблема в том, когда он пытается преобразовать поток таблицы в поток данных. Для решения этой проблемы требуется справка.

Мое требование - сопоставить два потока kafka с отметкой времени источника из сообщения. Я думаю, что метод, который я использовал, был осмысленным. Подскажите пожалуйста, есть ли лучший способ сделать это, предложите. Ищу помощь, поскольку я новичок в кодировании Flink.

Это мой код:

import org.apache.flink.table.api.EnvironmentSettings;
val  blinkplaner = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
val  stenv = StreamTableEnvironment.create(senv, blinkplaner)
import java.util.{Optional, Properties}
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.Cluster.Builder
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.cassandra._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.api.java.ClosureCleaner.clean
import com.datastax.driver.core.PlainTextAuthProvider
import com.datastax.driver.core.Cluster;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.flink.api.scala._;
import org.apache.flink.table.api._;
import org.apache.flink.table.api.scala._;

senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
val orderSchema = new Schema().field("order_num", Types.STRING()).field("status", Types.STRING()).field("ts", Types.SQL_TIMESTAMP()).rowtime(new Rowtime().timestampsFromSource.watermarksPeriodicBounded(1000));

stenv.connect(
      new Kafka()
        .version("0.11")
        .topic("Stream1")
        .startFromLatest()
        .property("bootstrap.servers","Bootstrap.Servers")
    )
      .withFormat(
        new Json()
          .failOnMissingField(false)
          .deriveSchema()   
      )
      .withSchema(orderSchema)
      .inAppendMode()
      .registerTableSource("Stream1_Events");

stenv.connect(
      new Kafka()
        .version("0.11")
        .topic("Stream2")
        .startFromLatest()
        .property("bootstrap.servers","Bootstrap.Servers")
    )
      .withFormat(
        new Json()
          .failOnMissingField(false)
          .deriveSchema()   
      )
      .withSchema(orderSchema)
      .inAppendMode()
      .registerTableSource("Stream2_Events");


val Sevent1: Table = stenv.scan("Stream1_Events");
val Sevent2: Table = stenv.scan("Stream2_Events").as('order_num1, 'status1, 'ts1);
val joinResult = Sevent1.join(Sevent2).where('order_num === 'order_num1 && 'ts <= 'ts1 - 5.minutes && 'ts > 'ts1 + 10.minutes).select('order_num, 'status, 'ts);

val Cbuilder: ClusterBuilder = new ClusterBuilder {override def buildCluster(builder: Cluster.Builder) = builder.addContactPoint("HostIP").withPort(PORT).withAuthProvider(new PlainTextAuthProvider("cassandra", "cassandra")).build()};

val sink: CassandraAppendTableSink = new CassandraAppendTableSink(Cbuilder,"INSERT INTO dm.order_join (order_num, status, ts) VALUES (?, ?, ?)");

stenv.registerTableSink("cassandraOutputTable",Array[String]("order_num","status","ts"),Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.SQL_TIMESTAMP),sink);

joinResult.insertInto("cassandraOutputTable");

senv.execute("Kafka Stream Table");

`

Также найдите детали исключения:

java.lang.UnsupportedOperationException
  at org.apache.flink.table.planner.expressions.RexNodeConverter.visit(RexNodeConverter.java:949)
  at org.apache.flink.table.planner.expressions.RexNodeConverter.visit(RexNodeConverter.java:889)
  at org.apache.flink.table.planner.expressions.RexNodeConverter.visit(RexNodeConverter.java:127)
  at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:135)
  at org.apache.flink.table.planner.expressions.RexNodeConverter.convertReinterpretCast(RexNodeConverter.java:652)
  at org.apache.flink.table.planner.expressions.RexNodeConverter.lambda$new$1(RexNodeConverter.java:140)
  at org.apache.flink.table.planner.expressions.RexNodeConverter$RexNodeConversion.convert(RexNodeConverter.java:1040)
  at org.apache.flink.table.planner.expressions.RexNodeConverter.visit(RexNodeConverter.java:942)
  at org.apache.flink.table.planner.expressions.RexNodeConverter.visit(RexNodeConverter.java:889)
  at org.apache.flink.table.planner.expressions.RexNodeConverter.visit(RexNodeConverter.java:127)
  at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:135)
  at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:293)
  at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:270)
  at scala.Option.map(Option.scala:146)
  at org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:270)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:117)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
  at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
  at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecWindowJoin.translateToPlanInternal(StreamExecWindowJoin.scala:135)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecWindowJoin.translateToPlanInternal(StreamExecWindowJoin.scala:52)
  at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecWindowJoin.translateToPlan(StreamExecWindowJoin.scala:52)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
  at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:133)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
  at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
  at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
  at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
  at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
  at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
  at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:327)
  at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
  ... 30 elided
...