Используя коннектор kafka, я записываю данные в формате avro в тему kafka, а затем, используя потоки kafka, отображаю некоторые значения и записываю вывод в другую тему, используя:
Stream.to("output_topic");
Мои данные записываются в выходную тему, но я столкнулся с проблемой смещения. Если у меня есть 25 записей в моей теме ввода, она записывает все 25 записей в мою тему вывода, но выдает ошибку как:
[2018-06-25 12:42:50,243] ERROR [ConsumerFetcher consumerId=console-consumer-3500_kafka-connector-1529910768088-712e7106,leaderId=0, fetcherId=0]Error due to(kafka.consumer.ConsumerFetcherThread)
kafka.common.KafkaException: Error processing data for partition Stream-0 offset 25
Вот моя полная ошибка:
> [2018-06-25 12:42:50,243] ERROR [ConsumerFetcher
> consumerId=console-consumer-3500_kafka-connector-1529910768088-712e7106,
> leaderId=0, fetcherId=0] Error due to
> (kafka.consumer.ConsumerFetcherThread) kafka.common.KafkaException:
> Error processing data for partition Stream-0 offset 25 at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
> at scala.Option.foreach(Option.scala:257) at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: java.lang.IllegalArgumentException: Illegal batch type
> class org.apache.kafka.common.record.DefaultRecordBatch. The older
> message format classes only support conversion from class
> org.apache.kafka.common.record.AbstractLegacyRecordBatch, which is
> used for magic v0 and v1 at
> kafka.message.MessageAndOffset$.fromRecordBatch(MessageAndOffset.scala:29)
> at
> kafka.message.ByteBufferMessageSet$$anonfun$internalIterator$1.apply(ByteBufferMessageSet.scala:169)
> at
> kafka.message.ByteBufferMessageSet$$anonfun$internalIterator$1.apply(ByteBufferMessageSet.scala:169)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at
> scala.collection.Iterator$class.toStream(Iterator.scala:1320) at
> scala.collection.AbstractIterator.toStream(Iterator.scala:1334) at
> scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)
> at scala.collection.AbstractIterator.toSeq(Iterator.scala:1334) at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:59)
> at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:87)
> at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:37)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:183)
> ... 15 more