Почему поток Кафки не смог произвести данные после долгого времени? - PullRequest
0 голосов
/ 30 декабря 2018

KafkaStream не смог произвести данные после долгого времени.(Превышение установленного времени истечения)

Даже KafkaStream был мертв после сообщения об ошибке регистрации.

Исключение ниже:

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229)
at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:660)
at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:493)
at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:553)
at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:405)
at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1111)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:730)
org.apache.kafka.common.KafkaException: Unexpected error in AddOffsetsToTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id
at org.apache.kafka.clients.producer.internals.TransactionManager$AddOffsetsToTxnHandler.handleResponse(TransactionManager.java:1237)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:834)

Версия:

  1. Кафка Брокер: 2.0.0
  2. Кафка-клиенты: 1.1.1
  3. Кафка-потоки: 1.1.1

(Брокер и Производитель)все по умолчанию:

  1. TRANSACTION_TIMEOUT_CONFIG
  2. транзакция.id.expiration.ms
  3. транзакция.max.timeout.ms

код:

Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

StreamsBuilder builder = new StreamsBuilder();
builder.stream("from", Consumed.with(Serdes.Integer(), Serdes.String()))
       .peek((key, value) -> System.out.println(value))
       .to("to", Produced.with(Serdes.Integer(), Serdes.String()), (key, value, numPartitions) -> key % numPartitions));

KafkaStreams streams = new KafkaStreams(bulider.build(), properties);
stream.start();

1 Ответ

0 голосов
/ 03 января 2019

Из сообщения об ошибке, кажется, здесь есть несколько неизвестных проблем:

  1. Внутри производителя, мы специально не обрабатываем INVALID_PRODUCER_ID_MAPPING в AddOffsetsToTxnHandler#handleResponse, это вызвалогенерируется фатальная ошибка с KafkaException.

  2. Внутри потоков мы проглатываем исключение ProducerFencedException, но из-за 1) генерируется фатальное исключение KafkaException, в результате чего он уходит и умирает напрямую.

Поведение пункта 1) не является обязательным, но я допускаю, что у него действительно есть несколько проблем в качестве запоздалой мысли:

a.Вообще говоря, дела, огороженные производителем, включая INVALID_PRODUCER_ID_MAPPING, должны обрабатываться лучше, чем 1) выше.Это рассматривается как https://cwiki.apache.org/confluence/display/KAFKA/KIP-360%3A+Improve+handling+of+unknown+producer

b.Txn Producer должен лучше различать «фатальные» и нефатальные ошибки, когда последние должны обрабатываться внутри системы, чем когда-либо передаваться вызывающей стороне.Одна быстрая мысль состоит в том, что, кроме ошибки, связанной с производителем, все другие ошибки, которые мы разработали до сих пор, должны рассматриваться как нефатальные и, следовательно, обрабатываться внутренне.

...