Ошибка потребителя kafka при записи данных из входной темы в выходную тему с использованием потоков kafka - PullRequest
0 голосов
/ 25 июня 2018

Используя коннектор kafka, я записываю данные в формате avro в тему kafka, а затем, используя потоки kafka, отображаю некоторые значения и записываю вывод в другую тему, используя:

Stream.to("output_topic");

Мои данные записываются в выходную тему, но я столкнулся с проблемой смещения. Если у меня есть 25 записей в моей теме ввода, она записывает все 25 записей в мою тему вывода, но выдает ошибку как:

[2018-06-25 12:42:50,243] ERROR [ConsumerFetcher consumerId=console-consumer-3500_kafka-connector-1529910768088-712e7106,leaderId=0, fetcherId=0]Error due to(kafka.consumer.ConsumerFetcherThread)

kafka.common.KafkaException: Error processing data for partition Stream-0 offset 25

Вот моя полная ошибка:

> [2018-06-25 12:42:50,243] ERROR [ConsumerFetcher
> consumerId=console-consumer-3500_kafka-connector-1529910768088-712e7106,
> leaderId=0, fetcherId=0] Error due to
> (kafka.consumer.ConsumerFetcherThread) kafka.common.KafkaException:
> Error processing data for partition Stream-0 offset 25    at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>   at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>   at scala.Option.foreach(Option.scala:257)   at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>   at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)  at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)    at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)    at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)  at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>   at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>   at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)   at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>   at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)

> Caused by: java.lang.IllegalArgumentException: Illegal batch type
> class org.apache.kafka.common.record.DefaultRecordBatch. The older
> message format classes only support conversion from class
> org.apache.kafka.common.record.AbstractLegacyRecordBatch, which is
> used for magic v0 and v1  at
> kafka.message.MessageAndOffset$.fromRecordBatch(MessageAndOffset.scala:29)
>   at
> kafka.message.ByteBufferMessageSet$$anonfun$internalIterator$1.apply(ByteBufferMessageSet.scala:169)
>   at
> kafka.message.ByteBufferMessageSet$$anonfun$internalIterator$1.apply(ByteBufferMessageSet.scala:169)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)  at
> scala.collection.Iterator$class.toStream(Iterator.scala:1320)     at
> scala.collection.AbstractIterator.toStream(Iterator.scala:1334)   at
> scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)
>   at scala.collection.AbstractIterator.toSeq(Iterator.scala:1334)     at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:59)
>   at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:87)
>   at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:37)
>   at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:183)
>   ... 15 more

1 Ответ

0 голосов
/ 12 июля 2018

Я получил ту же ошибку при использовании kafka-consumer-console.sh

Проблема была в том, что опция - zookeeper. Если вы укажете опцию —zookeeper, старый потребитель будет запущен по умолчанию, а магическая опция будет установлена ​​по умолчанию v0 или v1 (текущая версия kafka 1.1 использует v2). Вот почему происходит несовпадение версий.

Эту ошибку можно устранить с помощью параметра -bootstrap-server, а не -zookeeper. (Это означает запуск новой версии для потребителя)

Когда вы задаете опцию —bootstrap-server, должны быть указаны домен (или ip) брокера и номер порта. например, - сервер начальной загрузки kafka.domain: 9092, kafka2.domain: 9092

Порт по умолчанию для брокера (сервера Kafka) - 9092, и вы можете изменить порт в kafka / config / server.properties.

...