Поток Кафки: есть ли способ игнорировать определенные смещения в разделе темы при записи в другую тему - PullRequest
0 голосов
/ 15 мая 2018

Справочная информация: я использовал неправильный реестр схемы avro при создании темы prod, и в результате соединение kafka прервалось из-за сообщений с неверным идентификатором схемы. Поэтому в качестве плана восстановления мы хотели скопировать сообщения в теме prod в тестовую тему, а затем напишите хорошие сообщения в hdfs. Но мы сталкиваемся с проблемами с определенными смещениями, которые имеют неверный идентификатор схемы при чтении из темы prod. Есть способ игнорировать такие смещения при записи в другую тему.

 Exception in thread "StreamThread-1" 
 org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value 
 for record. topic=xxxx, partition=9, offset=1259032
  Caused by: org.apache.kafka.common.errors.SerializationException: Error 
  retrieving Avro schema for id 600
  Caused by: 

  io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
   Schema not found io.confluent.rest.exceptions.RestNotFoundException: Schema not found
  io.confluent.rest.exceptions.RestNotFoundException: Schema not found

{код}

1 Ответ

0 голосов
/ 15 мая 2018

Вы можете изменить обработчик исключений десериализации, чтобы пропустить эти записи, как описано в документации: https://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-records

Т.е. вы устанавливаете LogAndContinueExceptionHandler в конфигурации через параметр default.deserialization.exception.handler.

...