Смогу ли я читать с определенных разделов из темы Кафки - PullRequest
0 голосов
/ 26 июня 2018

Я использую фреймворк spring-kafka для реализации слушателя kafka. В настоящее время я читаю из темы. Но, согласно определенному требованию, меня спросили, как реализовать слушателя, который будет читать из определенных разделов из темы кафки.

Это хорошая практика, когда разные приложения читаются из разных разделов. Не могли бы вы помочь со знанием того, как я могу реализовать это через Spring-Kafka и Java.

Заранее спасибо.

1 Ответ

0 голосов
/ 26 июня 2018

ContainerProperties, используемый для настройки KafkaMessageListenerContainer, имеет этот ctor:

public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) {

Где это примерно:

**
 * A configuration container to represent a topic name, partition number and, optionally,
 * an initial offset for it. The initial offset can be:
 * <ul>
 * <li>{@code null} - do nothing;</li>
 * <li>positive (including {@code 0}) - seek to EITHER the absolute offset within the
 * partition or an offset relative to the current position for this consumer, depending
 * on {@link #isRelativeToCurrent()}.
 * </li>
 * <li>negative - seek to EITHER the offset relative to the current last offset within
 * the partition: {@code consumer.seekToEnd() + initialOffset} OR the relative to the
 * current offset for this consumer (if any), depending on
 * {@link #isRelativeToCurrent()}.</li>
 * </ul>
 * Offsets are applied when the container is {@code start()}ed.
 *
 * @author Artem Bilan
 * @author Gary Russell
 */
public class TopicPartitionInitialOffset {

Таким образом, вы можете указать, какой раздел из какой темы и какое смещение начать использовать.

Когда мы используем конфигурацию на основе разделов, мы получаем эту функцию на KafkaConsumer:

/**
 * Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment
 * and will replace the previous assignment (if there is one).
 * <p>
 * If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}.
 * <p>
 * Manual topic assignment through this method does not use the consumer's group management
 * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
 * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)}
 * and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.
 * <p>
 * If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new
 * assignment replaces the old one.
 *
 * @param partitions The list of partitions to assign this consumer
 * @throws IllegalArgumentException If partitions is null or contains null or empty topics
 * @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern
 *                               (without a subsequent call to {@link #unsubscribe()})
 */
@Override
public void assign(Collection<TopicPartition> partitions) {

Итак, это действительно стандартная практика для потребления напрямую из определенных разделов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...