Вот код Java с использованием flink 1.4, отладка его на intelij
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "");
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "/Y");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "4");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "2000");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "5");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
DataStream<Event> kinesisStream = env.addSource(new FlinkKinesisConsumer<>(
"ConversionScore",
new EventSchema(),
kinesisConsumerConfig)
);
DataStream<String> pickupCounts = kinesisStream
.map(trip -> Long.toString(trip.getTimestamp()))
.timeWindowAll(Time.seconds(3))
.apply(new AllWindowFunction<String, String, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<String> values, Collector<String> out) throws Exception {
StringBuilder stringBuilder = new StringBuilder();
for(String i : values) {
stringBuilder.append(i);
}
out.collect(stringBuilder.toString());
}
});
pickupCounts.print();
Но я ничего не получил, у Kinesis есть данные, его процесс muti-потока, если у него есть исключение, я обнаружилне смог бы увидеть, что я предполагаю