Я пытался реализовать сливное изображение kafka-connect для подключения или наше на прем S3. Мы успешно написали в s3 из коробки с помощью Boto3. Итак, мы знаем, что это не проблема подключения.
В зависимости от того, какие конвертеры я использую ... они выдают разные ошибки.
Вот переменные среды, работающие в контейнере Docker.
CONNECT_CONFIG_STORAGE_TOPIC=__kafka-connect-config
CONNECT_OFFSET_STORAGE_TOPIC=__kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC=__kafka-connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3
CONNECT_CONFIG_STORAGE_PARTITIONS=1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3
CONNECT_OFFSET_STORAGE_PARTITIONS=1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3
CONNECT_STATUS_STORAGE_PARTITIONS=1
CONNECT_REST_ADVERTISED_HOST_NAME=hostname
CONNECT_REST_ADVERTIZED_LISTENER=listener
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLED=false
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLED=true
CONNECT_REST_ADVERTISED_PORT=8083
CONNECT_REPLICATION_FACTOR=2
CONNECT_GROUP_ID=APP-CONNECT
CONNECT_CONSUMER_BOOTSTRAP_SERVERS=SASL_SSL://server-1.com:9092,SASL_SSL://server-2.com:9092,SASL_SSL://server-3.com:9092
CONNECT_BOOTSTRAP_SERVERS=SASL_SSL://server-1.com:9092,SASL_SSL://server-2.com:9092,SASL_SSL://server-3.com:9092
CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='pw';
CONNECT_CONSUMER_SSL_PROTOCOL=SSL
CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION=/etc/kafka/secrets/kafka.client.truststore.jks
CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD=password
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
CONNECT_CONSUMER_SASL_MECHANISM=PLAIN
CONNECT_LOG4J_OPTS=-Dlog4j.configuration=file:/etc/kafka_connect/log4j/log4j.properties
CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components
CONNECT_REST_PORT=8083
CONNECT_SECURITY_PROTOCOL=SASL_SSL
CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='pw';
CONNECT_SASL_MECHANISM=PLAIN
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
CONNECT_SSL_PROTOCOL=SSL
CONNECT_SSL_TRUSTSTORE_LOCATION=/etc/kafka/secrets/kafka.client.truststore.jks
CONNECT_SSL_TRUSTSTORE_PASSWORD=password
CONNECT_ZOOKEEPER_CONNECT=SASL_SSL://server-1.com:9092,SASL_SSL://server-2.com:9092,SASL_SSL://server-3.com:9092
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='pw';",
"flush.size": "1500",
"topics": "inventory",
"tasks.max": "2",
"rotate.interval.ms": "1000",
"consumer.sasl.mechanism": "PLAIN",
"store.url": "http://s3-server:9020",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"internal.key.converter.schemas.enable": "false",
"internal.value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enabled": "false",
"value.converter.schemas.enabled": "true",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"name": "inventory-2",
"consumer.security.protocol": "SASL_SSL",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "inventory-stage"
}
Я получаю то, что кажется успешным стартапом. Однако, когда я проверяю ведро; У меня там нет никаких предметов. Используя kafka-avro-потребитель-потребитель, я подтвердил, что сообщения avro существуют в теме.
[2019-04-11 18:14:52,612] INFO [Consumer clientId=consumer-42, groupId=connect-inventory-2] Resetting offset for partition inventory-0 to offset 9. (org.apache.kafka.clients.consumer.internals.Fetcher)
[2019-04-11 18:14:52,614] INFO Opening record writer for: topics/inventory/partition=2/inventory+2+0000000008.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2019-04-11 18:14:52,621] INFO [Consumer clientId=consumer-42, groupId=connect-inventory-2] Resetting offset for partition inventory-1 to offset 8. (org.apache.kafka.clients.consumer.internals.Fetcher)
[2019-04-11 18:14:52,621] WARN Property 'rotate.interval.ms' is set to '1000ms' but partitioner is not an instance of 'io.confluent.connect.storage.partitioner.TimeBasedPartitioner'. This property is ignored. (io.confluent.connect.s3.TopicPartitionWriter)
[2019-04-11 18:14:52,621] WARN Property 'rotate.interval.ms' is set to '1000ms' but partitioner is not an instance of 'io.confluent.connect.storage.partitioner.TimeBasedPartitioner'. This property is ignored. (io.confluent.connect.s3.TopicPartitionWriter)
[2019-04-11 18:14:52,626] INFO Opening record writer for: topics/inventory/partition=1/inventory+1+0000000008.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2019-04-11 18:14:52,645] INFO Opening record writer for: topics/inventory/partition=0/inventory+0+0000000009.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
Когда я меняю значение преобразователя на AvroConverter. Думая, что сообщения находятся в Avro и их необходимо преобразовать для использования API-интерфейсом соединителя.
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='pw';",
"flush.size": "1500",
"topics": "inventory",
"tasks.max": "2",
"rotate.interval.ms": "1000",
"consumer.sasl.mechanism": "PLAIN",
"store.url": "http://s3-server:9020",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"internal.key.converter.schemas.enable": "false",
"internal.value.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enabled": "false",
"value.converter.schemas.enabled": "true",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"name": "inventory-2",
"consumer.security.protocol": "SASL_SSL",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "inventory-stage"
}
Это указывает на то, что преобразователь avro не может найти схему с идентификатором 41. Однако этот идентификатор существует в реестре схемы. Смотри ниже
[2019-04-11 18:26:56,813] ERROR WorkerSinkTask{id=inventory-2-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: inventory
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 41
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:209)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:302)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:290)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:129)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:230)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:139)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2019-04-11 18:26:56,814] ERROR WorkerSinkTask{id=inventory-2-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2019-04-11 18:26:56,815] INFO [Consumer clientId=consumer-44, groupId=connect-inventory-2] Sending LeaveGroup request to coordinator localhost:9092 (id: 2147483644 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
{
"subject": "inventory-com.company.dcp.event.schema.shotify.SongCreatedEvent",
"version": 1,
"id": 41,
"schema": "{\"type\":\"record\",\"name\":\"SongCreatedEvent\",\"namespace\":\"com.company.dcp.event.schema.shotify\",\"doc\":\"Information about the Song Added event\",\"fields\":[{\"name\":\"eventHeader\",\"type\":{\"type\":\"record\",\"name\":\"EventHeader\",\"namespace\":\"com.company.commons.shotify\",\"fields\":[{\"name\":\"id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"time\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"source\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}},{\"name\":\"song\",\"type\":{\"type\":\"record\",\"name\":\"Song\",\"namespace\":\"com.company.commons.shotify\",\"fields\":[{\"name\":\"title\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Title of the Song\"},{\"name\":\"artist\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"The song composer\"},{\"name\":\"duration\",\"type\":\"int\",\"doc\":\"Song Duration in minutes\"},{\"name\":\"bitrate\",\"type\":\"int\",\"doc\":\"Song Bitrate, measured in kilobytes per second\"},{\"name\":\"lyrics\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Lyrics of the Song\"},{\"name\":\"fileURL\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Unique file Reference to the song\"}]}}],\"version\":\"2\"}"
}