Существующая внутренняя тема имеет недопустимые разделы - PullRequest
0 голосов
/ 23 октября 2018

При запуске нашего приложения Kafka Streams в тестовой установке только с одним брокером Kafka мы видим следующую ошибку примерно в 1 из 15 запусков:

org.apache.kafka.streams.errors.StreamsException: Existing internal topic alarm-message-streams-by-organization-repartition has invalid partitions: expected: 32; actual: 12. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.

Когда мы видим ошибку выше фактического количества разделовварьируется (ожидается 32, фактическое выше 0 и ниже 32).

Мы выполняем org.apache.kafka.streams.KafkaStreams#cleanUp перед вызовом org.apache.kafka.streams.KafkaStreams#start.Брокер Kafka запускается без данных (используя https://hub.docker.com/r/wurstmeister/kafka/) для каждого запуска теста.

При просмотре журнала для брокера Kafka мы видим следующее:

2018-10-22 18:41:31,373] INFO Topic creation Map(
    alarm-message-streams-by-organization-repartition-19 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-22 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-0 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-7 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-23 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-1 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-24 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-2 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-30 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-5 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-21 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-8 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-14 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-15 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-6 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-16 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-31 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-25 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-9 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-20 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-29 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-13 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-26 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-17 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-4 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-10 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-3 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-11 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-12 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-28 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-27 -> ArrayBuffer(42),
    alarm-message-streams-by-organization-repartition-18 -> ArrayBuffer(42)
) (kafka.zk.AdminZkClient)

Itпохоже, что тема создана с ожидаемым числом разделов (32). Позже, в том же журнале, похоже, что есть запрос на создание темы снова. Мы не знаем, почему это происходит, но, по крайней мере, запрос все ещесодержит ожидаемое количество разделов (32):

[2018-10-22 18:43:29,851] INFO [Admin Manager on Broker 42]: Error processing create topic request for topic alarm-message-streams-by-organization-repartition with arguments (numPartitions=32, replicationFactor=1, replicasAssignments={}, configs={cleanup.policy=delete, segment.bytes=52428800, segment.ms=600000, retention.ms=9223372036854775807, segment.index.bytes=52428800}) (kafka.server.AdminManager)
org.apache.kafka.common.errors.TopicExistsException: Topic 'alarm-message-streams-by-organization-repartition' already exists.

Мы никогда не видели, чтобы это происходило без тестирования, когда мы работали с 6 брокерами Kafka. Однако мы выполняем значительно большее количество тестовых прогоновчем развертывается в non-test.

Примечание: это не всегда одна и та же тема, которая вызывает ошибку.

Ошибка вызывает ошибки в наших настройках теста, поэтому мы хотели бы понять, почемуэто случается и справляется с этим. Кто-нибудь может дать некоторое представление об этом поведении Kafka Streams?

Мы используем Kafka и Kafka Streams 2.0.0.

1 Ответ

0 голосов
/ 24 октября 2018

Похоже, что неполные / неправильные метаданные получены из кластера Kafka (т. Е. Ваш единственный брокер).При запуске (или, если быть более точным, в каждом ребалансировке) Kafka Streams проверяет, существуют ли внутренние темы с ожидаемым количеством разделов.Если тема не существует, она создается (это должно происходить только один раз за время работы приложения).Если он существует с правильным количеством разделов, используется тема.Если тема существует с неправильным числом разделов, выдается исключение, о котором вы сообщаете.

Вызов KafkaStreams#cleanup() не должен иметь здесь никакого влияния.Это не то же самое, что StreamResetter, что вы можете звонить через bin/kafka-streams-application-reset.sh (ср. https://kafka.apache.org/20/documentation/streams/developer-guide/app-reset-tool.html)

. В настоящий момент я понятия не имею, что может быть основной причиной проблемы, т. Е. ПочемуKafka Streams получил неверные метаданные темы. Надеюсь, это поможет.

...