Предлагает ли Kafka API и / или протокол способ запрашивать свойства сервера? - PullRequest
0 голосов
/ 23 октября 2018

Я пишу производителю Kafka, который иногда отправляет запрос с пакетом сообщений, который превышает максимально допустимый размер запроса.Кажется, у меня нет прямого доступа к свойствам сервера для кластера Kafka, на который я отправляю сообщения, и я не нашел способа запросить у сервера значения, установленные в файлах server.properties.

Пример

Попытка отправить слишком большое сообщение приведет к запуску журнала Kafka с сообщением ...

11:47:37 kafka.1     | Topic and partition to exceptions: 
page-visits-0 -> org.apache.kafka.common.errors.RecordTooLargeException 
(kafka.server.KafkaApis)

Ответы [ 2 ]

0 голосов
/ 23 октября 2018

Вы можете использовать KafkaAdminClient API для получения информации о кластере.Он может дать уровень брокера, а также информацию на уровне темы.Ниже приведен код конфигурации сервера для каждого узла.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;


public class ListTopics {

    public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","localhost:9092");
        AdminClient admin = AdminClient.create(prop);
        DescribeClusterResult describeClusterResult = admin.describeCluster();
        List<Node> nodes = new ArrayList<>(describeClusterResult.nodes().get());
        // Pass the broker node ID here. You can use for loop in case of multiple broker nodes.
        ConfigResource resource = new ConfigResource(Type.BROKER, String.valueOf(nodes.get(0).id()));

        DescribeConfigsResult configs = admin.describeConfigs(Collections.singletonList(resource));
        Map<ConfigResource, Config> config = configs.all().get();
        System.out.println(config   );
    }
}

PS Этот API можно использовать только для установок Kafka 0.11 и выше.

0 голосов
/ 23 октября 2018

Предполагая, что ваш кластер работает как минимум с Kafka 0.11, вы можете использовать API AdminClient describeConfigs() для получения конфигураций брокера.

Например:

Properties configs = new Properties();
configs.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient client = AdminClient.create(configs);

List<ConfigResource> resources = Arrays.asList(new ConfigResource(Type.BROKER, "0"));
DescribeConfigsResult dcr = client.describeConfigs(resources);
for (Map.Entry<ConfigResource, Config> entry : dcr.all().get().entrySet()) {
    System.out.println(entry.getKey() + " - " + entry.getValue());
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...