public class MyRebalanceListener implements ConsumerAwareRebalanceListener {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
long rewindTo = System.currentTimeMillis() - 60000;
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(partitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> rewindTo)));
offsetsForTimes.forEach((k, v) -> consumer.seek(k, v.offset()));
}
}
и
@RunWith(SpringRunner.class)
@SpringBootTest
public class So52973119ApplicationTests {
@Test
public void rebalanceListenerTests() {
MyRebalanceListener listener = new MyRebalanceListener();
Consumer<?, ?> consumer = mock(Consumer.class);
AtomicLong expected = new AtomicLong(System.currentTimeMillis() - 60_000);
given(consumer.offsetsForTimes(anyMap())).willAnswer(i -> {
AtomicLong offset = new AtomicLong();
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = new HashMap<>();
Map<TopicPartition, Long> argument = i.getArgument(0);
argument.forEach((k, v) -> {
offsetsForTimes.put(k, new OffsetAndTimestamp(offset.incrementAndGet(), 0L));
assertThat(v).isBetween(expected.get(), expected.get() + 1_000);
});
return offsetsForTimes ;
});
TopicPartition t1 = new TopicPartition("foo", 0);
TopicPartition t2 = new TopicPartition("foo", 1);
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(t1);
partitions.add(t2);
listener.onPartitionsAssigned(consumer, partitions);
verify(consumer).seek(t1, 1);
verify(consumer).seek(t2, 2);
}
}