Я загружаю код Flink-stream-demo, в https://github.com/dataArtisans/flink-streaming-demo
Версия Flink в этом примере 0.10.Поэтому я попытался исправить код, чтобы сделать его совместимым с FLINK 1.6.2.
Я получил следующую ошибку, которую не могу исправить:
Error:(72, 52) type mismatch;
found : org.apache.flink.streaming.api.datastream.DataStreamSource[com.dataartisans.flink_demo.datatypes.TaxiRide]
required: org.apache.flink.streaming.api.scala.DataStream[com.dataartisans.flink_demo.datatypes.TaxiRide]
val rides: DataStream[TaxiRide] = env.addSource(new TaxiRideSource(
Я думаю, что это проблема пакета импорта, но я не могу это исправить.Заголовок SlidingArrivalCount.scala равен
import com.dataartisans.flink_demo.datatypes.{TaxiRide, GeoPoint}
import com.dataartisans.flink_demo.sinks.ElasticsearchUpsertSink
import com.dataartisans.flink_demo.sources.TaxiRideSource
import com.dataartisans.flink_demo.utils.{DemoStreamEnvironment,NycGeoUtils}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
. Заголовок SourceFunction:
import com.dataartisans.flink_demo.datatypes.TaxiRide
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
Код ошибки:
val env: StreamExecutionEnvironment = DemoStreamEnvironment.env
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Define the data source
val rides: DataStream[TaxiRide] = env.addSource(new TaxiRideSource(
data, maxServingDelay, servingSpeedFactor))