ContainerProperties
, используемый для настройки KafkaMessageListenerContainer
, имеет этот ctor:
public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) {
Где это примерно:
**
* A configuration container to represent a topic name, partition number and, optionally,
* an initial offset for it. The initial offset can be:
* <ul>
* <li>{@code null} - do nothing;</li>
* <li>positive (including {@code 0}) - seek to EITHER the absolute offset within the
* partition or an offset relative to the current position for this consumer, depending
* on {@link #isRelativeToCurrent()}.
* </li>
* <li>negative - seek to EITHER the offset relative to the current last offset within
* the partition: {@code consumer.seekToEnd() + initialOffset} OR the relative to the
* current offset for this consumer (if any), depending on
* {@link #isRelativeToCurrent()}.</li>
* </ul>
* Offsets are applied when the container is {@code start()}ed.
*
* @author Artem Bilan
* @author Gary Russell
*/
public class TopicPartitionInitialOffset {
Таким образом, вы можете указать, какой раздел из какой темы и какое смещение начать использовать.
Когда мы используем конфигурацию на основе разделов, мы получаем эту функцию на KafkaConsumer
:
/**
* Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment
* and will replace the previous assignment (if there is one).
* <p>
* If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}.
* <p>
* Manual topic assignment through this method does not use the consumer's group management
* functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
* metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)}
* and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.
* <p>
* If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new
* assignment replaces the old one.
*
* @param partitions The list of partitions to assign this consumer
* @throws IllegalArgumentException If partitions is null or contains null or empty topics
* @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern
* (without a subsequent call to {@link #unsubscribe()})
*/
@Override
public void assign(Collection<TopicPartition> partitions) {
Итак, это действительно стандартная практика для потребления напрямую из определенных разделов.