Проблемы чтения сообщений в кодировке AVRO (созданных потоком KSQL) с помощью Kafka Connect - PullRequest
0 голосов
/ 31 октября 2019

происходит что-то странное, когда мы создаем сообщения AVRO через KSQL и пытаемся использовать их с помощью Kafka Connect. Немного контекста:

Исходные данные Сторонний поставщик создает данные для одного из наших кластеров Kafka в формате JSON (пока что это хорошо). Фактически мы видим поступающие данные.

Преобразование данных Поскольку нашим внутренним системам требуется кодировать данные в AVRO, мы создали кластер KSQL, который преобразует входящие данные в AVRO, создавая следующеепоток в KSQL:

{
    "ksql": "
        CREATE STREAM src_stream (browser_name VARCHAR)
        WITH (KAFKA_TOPIC='json_topic', VALUE_FORMAT='JSON');

        CREATE STREAM sink_stream WITH (KAFKA_TOPIC='avro_topic',VALUE_FORMAT='AVRO',  PARTITIONS=1, REPLICAS=3) AS
        SELECT * FROM src_stream;
    ",
    "streamsProperties": {
        "ksql.streams.auto.offset.reset": "earliest"
    }
}

(пока что все хорошо)

Мы видим данные, создаваемые из темы JSON в тему AVRO, по мере увеличения смещения.

Затем мы создаем соединитель Kafka в (новом) кластере Kafka Connect. В качестве некоторого контекста мы используем несколько кластеров Kafka Connect (с одинаковыми свойствами для этих кластеров), и поэтому у нас есть кластер Kafka Connect, работающий для этих данных, но точная копия кластера для других данных AVRO (1 дляаналитика, 1 для наших бизнес-данных).

Раковина для этого разъема - BigQuery, мы используем Wepay BigQuery Sink Connector 1.2.0. Опять, пока, все хорошо. Наш бизнес-кластер работает нормально с этим соединителем, а темы AVRO в бизнес-кластере перенаправляются в BigQuery.

Однако, когда мы пытаемся использовать тему AVRO, созданную нашим оператором KSQL ранее, мы видим исключение: /

Исключением является следующее:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: dpt_video_event-created_v2
 at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:98)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
 ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:209)
 at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235)
 at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:415)
 at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:408)
 at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:123)
 at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:190)
 at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:169)
 at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
 at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:243)
 at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:134)
 at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:85)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

Что для нас означает, что Kafka Connect читает сообщение, декодирует AVRO и пытается извлечь схему с идентификатором 0 изреестр схемы. Очевидно, что идентификаторы схемы в реестре схемы всегда> 0.

В настоящее время мы пытаемся определить проблему здесь. Похоже, что KSQL кодирует сообщение с идентификатором схемы 0, но мы не можем найти причину для этого: /

Любая помощь приветствуется!

BR, Патрик

ОБНОВЛЕНИЕ: Мы внедрили базового потребителя для сообщений AVRO, и этот потребитель правильно идентифицирует схему в сообщениях AVRO (ID: 3), так что, похоже, он восходит к Kafka Connect, а не к реальным сообщениям KSQL / AVRO. .

1 Ответ

1 голос
/ 31 октября 2019

Очевидно, идентификаторы схемы в реестре схемы всегда> 0 ... Похоже, что KSQL кодирует сообщение с идентификатором схемы 0, но мы не можем найти причину для этого

AvroConverter выполняет «тупую проверку», при которой только выглядит, что использованные байты начинаются с магического байта 0x0. Следующие 4 байта являются идентификатором.

Если вы используете key.converter=AvroConverter, и ваши ключи начинаются как 0x00000 в шестнадцатеричном формате, тогда идентификатор будет показан как 0 в журналах, и поиск не удастся.

В последний раз, когда я проверял, KSQL не выводит ключи в формате Avro, поэтому вы захотите проверить свойства вашего коннектора.

...