не получил ничего от кинезиса, используя потребителя Flink - PullRequest
0 голосов
/ 07 сентября 2018

Вот код Java с использованием flink 1.4, отладка его на intelij

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    Properties kinesisConsumerConfig = new Properties();
    kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
    kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "");
    kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "/Y");
    kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "4");
    kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
    kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "2000");
    kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "5");
    kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

    DataStream<Event> kinesisStream = env.addSource(new FlinkKinesisConsumer<>(
            "ConversionScore",
            new EventSchema(),
            kinesisConsumerConfig)
    );

    DataStream<String> pickupCounts = kinesisStream
            .map(trip -> Long.toString(trip.getTimestamp()))
            .timeWindowAll(Time.seconds(3))
            .apply(new AllWindowFunction<String, String, TimeWindow>() {
              @Override
              public void apply(TimeWindow window, Iterable<String> values, Collector<String> out) throws Exception {
                StringBuilder stringBuilder = new StringBuilder();
                for(String i : values)  {
                  stringBuilder.append(i);
                }
                out.collect(stringBuilder.toString());
              }
            });

    pickupCounts.print();

Но я ничего не получил, у Kinesis есть данные, его процесс muti-потока, если у него есть исключение, я обнаружилне смог бы увидеть, что я предполагаю

...