Я не эксперт, но я попытаюсь объяснить, как вы могли бы это сделать.
Прежде всего, мы должны упомянуть методы подписки и назначения.
Когда Вы используете подписку, вы передаете одну или несколько тем. При этом список разделов каждой темы присваивается потребителю в зависимости от количества потребителей в его группе. Раздел topi c - это объект, образованный именем topi c и номером раздела.
consumer.Subscribe(Topic);
Вы можете использовать assign, чтобы передать разделы, которые будет читать потребитель. Этот метод не использует функциональные возможности управления группами потребителей (где нет необходимости в group.id). Если я не ошибаюсь, в методе assign вы можете указать начальное смещение.
consumer.Assign(topicName, 0, new Offset(lastConsumedOffset));
consumer.Assign(topicPartition, new Offset(lastConsumedOffset));
Другой вариант - использовать Метод seek () для установки смещения
consumer.Seek(topicPartitionOffset);
Если вы собираетесь смешивать подписку и назначать, помните, что вы должны использовать отписку раньше.
Другой вариант, если вы хотите Чтобы повторно использовать все сообщения, создайте потребителя в новой другой группе потребителей.
ПРИМЕР (ОБЗОР)
Я оставляю вас и пример на данный момент, я буду проверьте это позже. Я сделал пример в java, потому что я более знаком с ним. В этом примере я не использую подписку, я использую assign. Сначала извлекаются разделы topi c, мы устанавливаем дату и время начала чтения сообщений, создаем карту, указывающую, что это время и дата для каждого раздела.
С созданной картой мы получаем смещение каждого раздела на указанная дата-время с методом offsetsForTimes. Со смещением каждого раздела мы используем search для перехода к этому смещению на каждом разделе и, наконец, мы потребляем сообщения.
Сейчас у меня нет времени, чтобы проверить код, но я сделаю это. Надеюсь, это поможет.
AdminClient client = AdminClient.create(getAdminClientProperties());
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(
getConsumerProperties());
String TOPIC = "topic";
// get info of all partitions of a topic
List<PartitionInfo> partitionsInfo = consumer.partitionsFor(TOPIC);
// create TopicPartition list
Set<TopicPartition> partitions = new HashSet<>();
for (PartitionInfo p : partitionsInfo) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
// Consumer will read from all partitions
consumer.assign(partitions);
DateTime timeToStartReadMessagesFrom = new DateTime(2020, 3, 1, 0, 0, 0);
Map<TopicPartition, Long> timestamps = new HashMap<>();
for (TopicPartition tp : partitions) {
timestamps.put(tp, timeToStartReadMessagesFrom.getMillis());
}
// get the offset for that time in each partition
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
for (TopicPartition tp : partitions) {
consumer.seek(tp, offsets.get(tp).offset());
}
while (true) {
final ConsumerRecords<String, GenericRecord> consumerRecords = consumer.poll(1000);
// do something
break;
}
consumer.close();
System.out.println("DONE");