Flink Kinesis Connector не использует сообщения из потоков данных Kinesis на полную мощность - PullRequest
0 голосов
/ 24 марта 2020

Я проверяю, как быстро Apache Flink (используя v1.8.2) может читать сообщения из потока данных Kinesis. Kinesis Data Streams содержит только один фрагмент, и он содержит 40 000 сообщений. Размер каждого сообщения не превышает 5 КБ.

Пытаясь прочитать поток из самого старого сообщения с помощью TRIM_HORIZON, я ожидаю, что приложение должно иметь возможность быстро читать все сообщения, поскольку каждый шард может поддерживать до максимальной общей скорости чтения данных. 2 МБ в секунду через GetRecords. При настройке соединителя (SHARD_GETRECORDS_MAX = 400, SHARD_GETRECORDS_INTERVAL_MILLIS = 1000) приложение должно завершиться в течение нескольких минут, чтобы прочитать все сообщения. Но по какой-то причине чтение всех сообщений занимает много времени.

Не могли бы вы проверить, что не так в моей конфигурации коннектора? Ценю вашу помощь.

    public static DataStream<ObjectNode> createKinesisStream(
            StreamExecutionEnvironment env) throws IOException {
        Properties properties = new Properties();
        properties.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
        properties.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
        properties.put(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "400");
        properties.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");

        properties.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "PROFILE");
        properties.put(ConsumerConfigConstants.AWS_PROFILE_NAME, "d");

        return env.addSource(new FlinkKinesisConsumer<>(
                    "stream1", new JsonNodeDeserializationSchema(), properties));
    }

   main() code:
   final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
   env.getConfig().setAutoWatermarkInterval(10000L);

   source = AppConfig.createKinesisStream(env);

   DataStream<ObjectNode> filteredStream = source
                .map(new CustomMap());
I have put a counter in discarding sink, in one fetch it read 27 messages( counter 829-855)

24 Mar 2020 08:11:50,519 INFO  DiscardingSink:15 - 827
24 Mar 2020 08:11:50,519 INFO  DiscardingSink:15 - 828
24 Mar 2020 08:11:51,631 INFO  DiscardingSink:15 - 829
24 Mar 2020 08:11:51,631 INFO  DiscardingSink:15 - 830
.
.
24 Mar 2020 08:11:51,639 INFO  DiscardingSink:15 - 854
24 Mar 2020 08:11:51,639 INFO  DiscardingSink:15 - 855
24 Mar 2020 08:11:52,749 INFO  DiscardingSink:15 - 856

1 Ответ

0 голосов
/ 24 марта 2020

Одним из возможных объяснений является то, что что-то в вашем трубопроводе оказывает обратное давление на источник. Чтобы измерить только емкость источника, вы можете упростить задачу до следующего:

source.addSink(new DiscardingSink<>());

, где DiscardingSink равно

public static class DiscardingSink<OUT> implements SinkFunction<OUT> {

    @Override
    public void invoke(OUT value, Context context) throws Exception {
    }
}
...