Соедините Flink Kafka с eventhub - PullRequest
0 голосов
/ 21 марта 2019

Я использую Apache Flink и пытаюсь подключиться к концентратору событий Azure, используя протокол Apache Kafka для получения сообщений от него. Мне удается подключиться к концентратору событий Azure и получать сообщения, но я не могу использовать функцию flink "setStartFromTimestamp (...)", как описано здесь (https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration). Когда я пытаюсь получить некоторые сообщения из метки времени, Кафка сказал, что формат сообщения на стороне брокера - до 0.10.0. Кто-нибудь сталкивался с этим? Клиентская версия Apache Kafka - 2.0.1 Apache Flink версия 1.7.2

ОБНОВЛЕНО: попытался использовать примеры быстрого запуска Azure-Event-Hub (https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java), в коде потребительского пакета добавлен код для получения смещения с отметкой времени, возвращается нулевое значение, как и ожидалось, если версия сообщения под версией 0.10.0 kafka.

        List<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
        List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
        Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionToTimestampMap);
        System.out.println(offsetAndTimestamp);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...