Где именно раздел по умолчанию определен в производителе кафки? - PullRequest
0 голосов
/ 11 мая 2018

Я пытаюсь найти исходный код, и все, что я могу найти, это

/Users/myUser/.m2/repository/org/apache/kafka/kafka_2.10/0.9.0.2/kafka_2.10-0.9.0.2.jar!/kafka/javaapi/producer/Producer.class

public void send(List<KeyedMessage<K, V>> messages) {
   this.underlying().send(scala.collection.JavaConversions..MODULE$.asScalaBuffer(messages).toSeq());
}

, который вызывает

/Users/myUser/.m2/repository/org/apache/kafka/kafka_2.10/0.9.0.2/kafka_2.10-0.9.0.2.jar!/kafka/producer/Producer.class

public void send(Seq<KeyedMessage<K, V>> messages) {
    synchronized(this.lock()) {
        if(this.hasShutdown().get()) {
            throw new ProducerClosedException();
        } else {
            this.recordStats(messages);
            boolean var3 = this.sync();
            BoxedUnit var4;
            if(var3) {
                this.eventHandler().handle(messages);
                var4 = BoxedUnit.UNIT;
            } else {
                if(var3) {
                    throw new MatchError(BoxesRunTime.boxToBoolean(var3));
                }

                this.asyncSend(messages);
                var4 = BoxedUnit.UNIT;
            }

            BoxedUnit var10000 = BoxedUnit.UNIT;
        }
    }
}

Это довольно запутанно, потому что я ожидаю, что обе функции send будут принимать идентификатор раздела из раздела по умолчанию как

public void send(List<KeyedMessage<K, V>> messages, int partition id)

или как-то вызвать раздел по умолчанию дляполучить идентификатор раздела в определении метода send

Но я не вижу нигде используемого идентификатора раздела.

Может кто-нибудь указать мне направление, где искать, где находится разделрешил?

Это прежде чем мы позвоним producer.send?

1 Ответ

0 голосов
/ 11 мая 2018

Вы смотрите на старый API, эта логика в EventHandler , которая вызывается методом send


Для новых клиентов вы можете увидеть это в методе doSend

 int partition = partition(record, serializedKey, serializedValue, cluster);
 tp = new TopicPartition(record.topic(), partition);

Класс разделителя загружается из свойств, переданных источнику . Если не определено, это значение по умолчанию

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...