Я использую Apache Flink и пытаюсь подключиться к концентратору событий Azure, используя протокол Apache Kafka для получения сообщений от него. Мне удается подключиться к концентратору событий Azure и получать сообщения, но я не могу использовать функцию flink "setStartFromTimestamp (...)", как описано здесь (https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration).
Когда я пытаюсь получить некоторые сообщения из метки времени, Кафка сказал, что формат сообщения на стороне брокера - до 0.10.0.
Кто-нибудь сталкивался с этим?
Клиентская версия Apache Kafka - 2.0.1
Apache Flink версия 1.7.2
ОБНОВЛЕНО: попытался использовать примеры быстрого запуска Azure-Event-Hub (https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java), в коде потребительского пакета добавлен код для получения смещения с отметкой времени, возвращается нулевое значение, как и ожидалось, если версия сообщения под версией 0.10.0 kafka.
List<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionToTimestampMap);
System.out.println(offsetAndTimestamp);