происходит что-то странное, когда мы создаем сообщения AVRO через KSQL и пытаемся использовать их с помощью Kafka Connect. Немного контекста:
Исходные данные Сторонний поставщик создает данные для одного из наших кластеров Kafka в формате JSON (пока что это хорошо). Фактически мы видим поступающие данные.
Преобразование данных Поскольку нашим внутренним системам требуется кодировать данные в AVRO, мы создали кластер KSQL, который преобразует входящие данные в AVRO, создавая следующеепоток в KSQL:
{
"ksql": "
CREATE STREAM src_stream (browser_name VARCHAR)
WITH (KAFKA_TOPIC='json_topic', VALUE_FORMAT='JSON');
CREATE STREAM sink_stream WITH (KAFKA_TOPIC='avro_topic',VALUE_FORMAT='AVRO', PARTITIONS=1, REPLICAS=3) AS
SELECT * FROM src_stream;
",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
(пока что все хорошо)
Мы видим данные, создаваемые из темы JSON в тему AVRO, по мере увеличения смещения.
Затем мы создаем соединитель Kafka в (новом) кластере Kafka Connect. В качестве некоторого контекста мы используем несколько кластеров Kafka Connect (с одинаковыми свойствами для этих кластеров), и поэтому у нас есть кластер Kafka Connect, работающий для этих данных, но точная копия кластера для других данных AVRO (1 дляаналитика, 1 для наших бизнес-данных).
Раковина для этого разъема - BigQuery, мы используем Wepay BigQuery Sink Connector 1.2.0. Опять, пока, все хорошо. Наш бизнес-кластер работает нормально с этим соединителем, а темы AVRO в бизнес-кластере перенаправляются в BigQuery.
Однако, когда мы пытаемся использовать тему AVRO, созданную нашим оператором KSQL ранее, мы видим исключение: /
Исключением является следующее:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: dpt_video_event-created_v2
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:98)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:209)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:415)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:408)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:123)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:190)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:169)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:243)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:134)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Что для нас означает, что Kafka Connect читает сообщение, декодирует AVRO и пытается извлечь схему с идентификатором 0 изреестр схемы. Очевидно, что идентификаторы схемы в реестре схемы всегда> 0.
В настоящее время мы пытаемся определить проблему здесь. Похоже, что KSQL кодирует сообщение с идентификатором схемы 0, но мы не можем найти причину для этого: /
Любая помощь приветствуется!
BR, Патрик
ОБНОВЛЕНИЕ: Мы внедрили базового потребителя для сообщений AVRO, и этот потребитель правильно идентифицирует схему в сообщениях AVRO (ID: 3), так что, похоже, он восходит к Kafka Connect, а не к реальным сообщениям KSQL / AVRO. .