Ошибка конфигурации Kafka Streams KTable в Центре сообщений - PullRequest
0 голосов
/ 03 мая 2018

Эта проблема теперь решена в Центре сообщений

У меня возникли проблемы с созданием KTable в Кафке. Я новичок в Кафке, которая, вероятно, является корнем моей проблемы, но я подумал, что могу спросить здесь в любом случае. У меня есть проект, в котором я хотел бы отслеживать различные идентификаторы путем подсчета их общего числа. Я использую Message Hub в IBM Cloud для управления своими темами, и до сих пор он работал великолепно.

У меня есть тема в Message Hub, которая создает сообщения типа {"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"}, на данный момент единственным ключом релевантности является идентификатор.

Мой код Kafka вместе с конфигурацией Streams выглядит следующим образом:

import org.apache.kafka.streams.StreamsConfig;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");    
props.put("security.protocol","SASL_SSL");
props.put("sasl.mechanism","PLAIN");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
saslJaasConfig = saslJaasConfig.replace("USERNAME", user).replace("PASSWORD", password);
props.put("sasl.jaas.config",saslJaasConfig);

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> Kstreams = builder.stream(myTopic);

KTable<String, Long> eventCount = Kstreams
        .flatMapValues(value -> getID(value)) //function that retrieves the ID
        .groupBy((key, value) -> value)
        .count();

Когда я запускаю код, я получаю следующие ошибки:

Исключение в потоке "KTableTest-e2062d11-0b30-4ed0-82b0-00d83dcd9366-> StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Не удалось создать тему KTableTest-KSTREAM-AGGREGATE-STATE-STORE- STORE- 0000000003-передел.

Далее:

Вызывается: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: неверная конфигурация: {сегмент.index.bytes = 52428800, сегмент.bytes = 52428800, cleanup.policy = удалить, segment.ms = 600000}. Только разрешенные конфиги: [retention.ms, cleanup.policy]

Понятия не имею, почему возникает эта ошибка и что с этим можно сделать. Является ли способ, которым я построил KStream и KTable, как-то неправильно? Или, возможно, концентратор сообщений на Bluemix?

Решено:

Добавление выдержки из комментариев ниже ответа, который я пометил как правильный. Оказалось, что мой StreamsConfig был в порядке, и что (на данный момент) есть проблема на стороне Message Hub, но есть обходной путь:

Оказывается, у Message Hub возникла проблема при создании тем для перераспределения с помощью Kafka Streams 1.1. Пока мы работаем над исправлением, вам нужно будет вручную создать раздел «KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition». Для этого нужно столько разделов, сколько для вашей темы ввода (myTopic) и установить максимальное время хранения. Я опубликую еще один комментарий, как только он будет исправлен

Большое спасибо за помощь!

1 Ответ

0 голосов
/ 03 мая 2018

Message Hub имеет некоторые ограничения на конфигурации, которые можно использовать при создании тем.

Из полученного исключения PolicyViolationEx похоже, что ваше потоковое приложение пыталось использовать несколько недопустимых конфигов:

  • segment.index.bytes
  • segment.bytes
  • segment.ms

Полагаю, вы установили их где-то в конфигурации потоков, и они должны быть удалены.

Обратите внимание, что вам также нужно установить StreamsConfig.REPLICATION_FACTOR_CONFIG на 3 в вашей конфигурации для работы с Message Hub, как указано в наших документах .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...