Ошибка Kafka Streams «TaskAssignmentException: невозможно декодировать данные подписки: версия = 4» - PullRequest
0 голосов
/ 27 сентября 2019

При развертывании только с измененной версией Kafka-Streams с 1.1.1 на 2.x.x (без изменения application.id) мы получили исключения на узле приложения с более старой версией Kafka-Streams, и в результате потоки Kafka изменили состояниеиз-за ошибки и закрытия, тем временем узел приложения с новой версией Kafka-Streams нормально принимает сообщения.

Если мы обновимся с 1.1.1 до 2.0.0, получим ошибку unable to decode subscription data: version=3;если от 1.1.1 до 2.3.0: unable to decode subscription data: version=4.Это может быть очень болезненно при развертывании канареек, например, у нас есть 3 узла приложений с предыдущей версией Kafka-Streams, и когда мы добавим еще один узел с новой версией, все существующие 3 узла будут в состоянии ошибки.Трассировка стека ошибок:

TaskAssignmentException: unable to decode subscription data: version=4
    at org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:128)
    at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:297)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:358)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:520)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:822)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:802)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:831)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)

Проблема воспроизводима на обеих версиях брокера Kafka 1.1.0 и 2.1.1, даже с простым примером DSL Kafka-Streams:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("default.key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put("default.value.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put("application.id", "xxx");

StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.<String, String>stream("source")
        .mapValues(value -> value + value)
        .to("destination");
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);

Является ли этобаг кафка-потоков?Существует ли обходной путь для предотвращения такого сбоя?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...