Подключение Кафки и HDFS - Неправильная ФС - PullRequest
0 голосов
/ 22 января 2020

Контейнер Namenode:

 Core-site:
    <configuration>
      <property><name>hadoop.proxyuser.hue.hosts</name><value>*</value></property>
      <property><name>fs.defaultFS</name><value>hdfs://namenode:9000</value></property>
      <property><name>hadoop.http.staticuser.user</name><value>root</value></property>
      <property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.SnappyCodec</value></property>
      <property><name>hadoop.proxyuser.hue.groups</name><value>*</value></property>
    </configuration>

 Hdfs-site:
    <configuration>
      <property><name>dfs.namenode.datanode.registration.ip-hostname-check</name><value>false</value></property>
      <property><name>dfs.webhdfs.enabled</name><value>true</value></property>
      <property><name>dfs.permissions.enabled</name><value>false</value></property>
      <property><name>dfs.namenode.name.dir</name><value>file:///hadoop/dfs/name</value></property>
      <property><name>dfs.namenode.rpc-bind-host</name><value>0.0.0.0</value></property>
      <property><name>dfs.namenode.servicerpc-bind-host</name><value>0.0.0.0</value></property>
      <property><name>dfs.namenode.http-bind-host</name><value>0.0.0.0</value></property>
      <property><name>dfs.namenode.https-bind-host</name><value>0.0.0.0</value></property>
      <property><name>dfs.client.use.datanode.hostname</name><value>true</value></property>
      <property><name>dfs.datanode.use.datanode.hostname</name><value>true</value></property>
    </configuration>

Контейнер Kafka Connect:

     Plugins:
       HTTP/1.1 200 OK
       Date: Wed, 22 Jan 2020 15:04:22 GMT
       Content-Type: application/json
       Content-Length: 1284
       Server: Jetty(9.4.20.v20190813)

[{"class":"io.confluent.connect.hdfs.HdfsSinkConnector","type":"sink","version":"5.4.0"},{"class":"io.confluent.connect.hdfs.tools.SchemaSourceConnector","type":"source","version":"2.4.0"},{"class":"io.confluent.connect.storage.tools.SchemaSourceConnector","type":"source","version":"2.4.0"},{"class":"io.debezium.connector.mongodb.MongoDbConnector","type":"source","version":"1.0.0.Final"},{"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.0.0.Final"},{"class":"io.debezium.connector.oracle.OracleConnector","type":"source","version":"1.0.0.Final"},{"class":"io.debezium.connector.postgresql.PostgresConnector","type":"source","version":"1.0.0.Final"},{"class":"io.debezium.connector.sqlserver.SqlServerConnector","type":"source","version":"1.0.0.Final"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.4.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.4.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}][root@debezium docker-hadoop-master]#

Соединитель hdfs-sink:

{"name":"hdfs-sink","config":{"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector","tasks.max":"1","topics":"dbserver1,dbserver1.inventory.products,dbserver1.inventory.products_on_hand,dbserver1.inventory.customrs,dbserver1.inventory.orders, dbserver1.inventory.geom,dbserver1.inventory.addresses","store.url":"hdfs://namenode:9000","flush.size":"3","logs.dir":"logs","topics.dir":"kafka","format.class":"io.confluent.connect.hdfs.sting.StringFormat","partitioner.class":"io.confluent.connect.hdfs.partitioner.DefaultPartitioner","partition.field.name":"day","request.timeout.ms":"310000","heartbeat.interval.ms":"60000","consumer.session.timeout.ms":"30000","max.poll.records":"10000","hadoop.conf.dir":"/opt/hadoop-3.1.3/etc/hadoop","offsets.retention.minutes":"1440","name":"hdfs-sink"},"tasks":[{"connector":"hdfs-sink","task":0}],"type":"sink"}[root@debezium docker-hadoopmaster]

Среды:

KAFKA_HOME=/kafka
HOSTNAME=20a482382ba9
CONFIG_STORAGE_TOPIC=my_connect_configs
TERM=xterm
HISTSIZE=1000
AVRO_JACKSON_VERSION=1.9.13
SCALA_VERSION=2.12
CORE_CONF_fs_defaultFS=hdfs://namenode:9000
KAFKA_CONNECT_PLUGINS_DIR=/kafka/connect
USER=root
LS_COLORS=rs=0:di=01;34:ln=01;36:mh=00:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=40;31;01:mi=01;05;37;41:su=37;41:sg=30;43:ca=30;41:tw=30;42:ow=34;42:st=37;44:ex=01;32:*.tar=01;31:*.tgz=01;31:*.arc=01;31:*.arj=1;31:*.taz=01;31:*.lha=01;31:*.lz4=01;31:*.lzh=01;31:*.lzma=01;31:*.tlz=01;31:*.txz=01;31:*.tzo=01;31:*.t7z=01;31:*.zip=01;31:*.z=01;31:*.Z=01;31:*.dz=01;31:*.gz=01;31:*.lrz=01;31:*.lz=01;31:*.lzo=01;31:*.xz=01;31:*.bz2=0131:*.bz=01;31:*.tbz=01;31:*.tbz2=01;31:*.tz=01;31:*.deb=01;31:*.rpm=01;31:*.jar=01;31:*.war=01;31:*.ear=01;31:*.sar=01;31:*.rar=01;31:*.alz=01;31:*.ace=01;31:*.zoo=01;31:*.cpio=01;31:*.7z=01;31:*.rz=01;31:*.cab=01;31:*.jpg01;35:*.jpeg=01;35:*.gif=01;35:*.bmp=01;35:*.pbm=01;35:*.pgm=01;35:*.ppm=01;35:*.tga=01;35:*.xbm=01;35:*.xpm=01;35:*.tif=01;35:*.tiff=01;35:*.png=01;35:*.svg=01;35:*.svgz=01;35:*.mng=01;35:*.pcx=01;35:*.mov=01;35:*.mpg=01;5:*.mpeg=01;35:*.m2v=01;35:*.mkv=01;35:*.webm=01;35:*.ogm=01;35:*.mp4=01;35:*.m4v=01;35:*.mp4v=01;35:*.vob=01;35:*.qt=01;35:*.nuv=01;35:*.wmv=01;35:*.asf=01;35:*.rm=01;35:*.rmvb=01;35:*.flc=01;35:*.avi=01;35:*.fli=01;35:*.lv=01;35:*.gl=01;35:*.dl=01;35:*.xcf=01;35:*.xwd=01;35:*.yuv=01;35:*.cgm=01;35:*.emf=01;35:*.axv=01;35:*.anx=01;35:*.ogv=01;35:*.ogx=01;35:*.aac=01;36:*.au=01;36:*.flac=01;36:*.mid=01;36:*.midi=01;36:*.mka=01;36:*.mp3=01;3:*.mpc=01;36:*.ogg=01;36:*.ra=01;36:*.wav=01;36:*.axa=01;36:*.oga=01;36:*.spx=01;36:*.xspf=01;36:
POSTGRES_MD5=955675a36c7af5c2cf7f9f85d4abac3b
MAVEN_REPO_INCUBATOR=
MYSQL_MD5=41820a2e872676d4e9acaeb3cb2b2c13
CONNECT_PLUGIN_PATH=
MAIL=/var/spool/mail/root
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
KAFKA_VERSION=2.4.0
PWD=/kafka
MAVEN_REPO_CENTRAL=
JAVA_HOME=/etc/alternatives/jre
ORACLE_MD5=6557bb454dcef2deb4aeb4e148cc313f
GROUP_ID=1
HADOOP_CONF_DIR=/opt/hadoop-3.1.3/etc/hadoop
DEBEZIUM_VERSION=1.0.0.Final
HISTCONTROL=ignoredups
JAVA_MAJOR_VERSION=11
BOOTSTRAP_SERVERS=172.18.0.10:9092
SQLSERVER_MD5=f86d8d403a3265cb4f550ed5ddc457d7
SHLVL=1
HOME=/root
JAVA_APP_DIR=/deployments
STATUS_STORAGE_TOPIC=my_connect_statuses
LOGNAME=root
MONGODB_MD5=921d69952d2277b9664c69d2e4fbac9c
SHA512HASH=53B52F86EA56C9FAC62046524F03F75665A089EA2DAE554AEFE3A3D2694F2DA88B5BA8725D8BE55F198BA80695443559ED9DE7C0B2A2817F7A6141008FF79F49
CONFLUENT_VERSION=5.1.2
MAVEN_DEP_DESTINATION=/kafka/connect
OFFSET_STORAGE_TOPIC=my_connect_offsets
AVRO_VERSION=1.8.2
KAFKA_URL_PATH=kafka/2.4.0/kafka_2.12-2.4.0.tgz

Log Kafka Connect:

2020-01-22 14:41:51,716 INFO   ||  EnrichedConnectorConfig values:
        config.action.reload = restart
        connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
        errors.deadletterqueue.context.headers.enable = false
        errors.deadletterqueue.topic.name =
        errors.deadletterqueue.topic.replication.factor = 3
        errors.log.enable = false
        errors.log.include.messages = false
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = hdfs-sink
        tasks.max = 1
        topics = [dbserver1, dbserver1.inventory.products, dbserver1.inventory.products_on_hand, dbserver1.inventory.customers, dbserver1.inventory.orders, dbserver1.inventory.geom, dbserver1.inventory.addresses]
        topics.regex =
        transforms = []
        value.converter = null
   [org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2020-01-22 14:41:51,716 INFO   ||  ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [172.18.0.10:9092]
        check.crcs = true
        client.dns.lookup = default
        client.id = connector-consumer-hdfs-sink-0
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = connect-hdfs-sink
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
   [org.apache.kafka.clients.consumer.ConsumerConfig]
2020-01-22 14:41:51,718 INFO   ||  Kafka version: 2.4.0   [org.apache.kafka.common.utils.AppInfoParser]
2020-01-22 14:41:51,718 INFO   ||  Kafka commitId: 77a89fcf8d7fa018   [org.apache.kafka.common.utils.AppInfoParser]
2020-01-22 14:41:51,718 INFO   ||  Kafka startTimeMs: 1579704111718   [org.apache.kafka.common.utils.AppInfoParser]
2020-01-22 14:41:51,719 INFO   ||  [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-01-22 14:41:51,722 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Subscribed to topic(s): dbserver1, dbserver1.inventory.products, dbserver1.inventory.products_on_hand, dbservr1.inventory.customers, dbserver1.inventory.orders, dbserver1.inventory.geom, dbserver1.inventory.addresses   [org.apache.kafka.clients.consumer.KafkaConsumer]
2020-01-22 14:41:51,723 INFO   ||  HdfsSinkConnectorConfig values:
        avro.codec = null
        connect.hdfs.keytab =
        connect.hdfs.principal =
        connect.meta.data = true
        enhanced.avro.schema.support = false
        filename.offset.zero.pad.width = 10
        flush.size = 3
        format.class = class io.confluent.connect.hdfs.string.StringFormat
        hadoop.conf.dir = /opt/hadoop-3.1.3/etc/hadoop
        hadoop.home =
        hdfs.authentication.kerberos = false
        hdfs.namenode.principal =
        hdfs.url = null
        kerberos.ticket.renew.period.ms = 3600000
        logs.dir = logs
        retry.backoff.ms = 5000
        rotate.interval.ms = -1
        rotate.schedule.interval.ms = -1
        schema.cache.size = 1000
        schema.compatibility = NONE
        shutdown.timeout.ms = 3000
   [io.confluent.connect.hdfs.HdfsSinkConnectorConfig]
2020-01-22 14:41:51,723 INFO   ||  StorageCommonConfig values:
        directory.delim = /
        file.delim = +
        storage.class = class io.confluent.connect.hdfs.storage.HdfsStorage
        store.url = hdfs://namenode:9000
        topics.dir = kafka
   [io.confluent.connect.storage.common.StorageCommonConfig]
2020-01-22 14:41:51,723 INFO   ||  HiveConfig values:
        hive.conf.dir =
        hive.database = default
        hive.home =
        hive.integration = false
        hive.metastore.uris =
   [io.confluent.connect.storage.hive.HiveConfig]
2020-01-22 14:41:51,723 INFO   ||  PartitionerConfig values:
        locale =
        partition.duration.ms = -1
        partition.field.name = [day]
        partitioner.class = class io.confluent.connect.hdfs.partitioner.DefaultPartitioner
        path.format =
        timestamp.extractor = Wallclock
        timestamp.field = timestamp
        timezone =
   [io.confluent.connect.storage.partitioner.PartitionerConfig]
2020-01-22 14:41:51,723 INFO   ||  AvroDataConfig values:
        connect.meta.data = true
        enhanced.avro.schema.support = false
        schemas.cache.config = 1000
   [io.confluent.connect.avro.AvroDataConfig]
2020-01-22 14:41:51,723 INFO   ||  Hadoop configuration directory /opt/hadoop-3.1.3/etc/hadoop   [io.confluent.connect.hdfs.DataWriter]
2020-01-22 14:41:51,776 INFO   ||  The connector relies on offsets in HDFS filenames, but does commit these offsets to Connect to enable monitoring progress of the HDFS connector. Upon startup, the HDFS Connector restores ffsets from filenames in HDFS. In the absence of files in HDFS, the connector will attempt to find offsets for its consumer group in the '__consumer_offsets' topic. If offsets are not found, the consumer will rely on the rset policy specified in the 'consumer.auto.offset.reset' property to start exporting data to HDFS.   [io.confluent.connect.hdfs.HdfsSinkTask]
2020-01-22 14:41:51,776 INFO   ||  WorkerSinkTask{id=hdfs-sink-0} Sink task finished initialization and start   [org.apache.kafka.connect.runtime.WorkerSinkTask]
2020-01-22 14:41:51,782 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Cluster ID: 0TvMS3DVTIur7n7N3uOQpw   [org.apache.kafka.clients.Metadata]
2020-01-22 14:41:51,782 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Discovered group coordinator 172.18.0.10:9092 (id: 2147483646 rack: null)   [org.apache.kafka.clients.consume.internals.AbstractCoordinator]
2020-01-22 14:41:51,789 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] (Re-)joining group   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2020-01-22 14:41:51,791 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] (Re-)joining group   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2020-01-22 14:41:51,793 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Finished assignment for group at generation 3: {connector-consumer-hdfs-sink-0-423aaedf-7e73-4d83-866d-53f41f40ec5=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@7d32b3}   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2020-01-22 14:41:51,795 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Successfully joined group with generation 3   [org.apache.kafka.clients.consumer.internals.AbstractCoordinato]
2020-01-22 14:41:51,795 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Adding newly assigned partitions: dbserver1-0, dbserver1.inventory.geom-0, dbserver1.inventory.products-0, dberver1.inventory.customers-0, dbserver1.inventory.orders-0, dbserver1.inventory.products_on_hand-0, dbserver1.inventory.addresses-0   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2020-01-22 14:41:51,796 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition dbserver1-0   [org.apache.kafka.clients.consumer.internals.ConsumerCordinator]
2020-01-22 14:41:51,796 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition dbserver1.inventory.geom-0   [org.apache.kafka.clients.consumer.interals.ConsumerCoordinator]
2020-01-22 14:41:51,796 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition dbserver1.inventory.products-0   [org.apache.kafka.clients.consumer.iternals.ConsumerCoordinator]
2020-01-22 14:41:51,796 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition dbserver1.inventory.orders-0   [org.apache.kafka.clients.consumer.intrnals.ConsumerCoordinator]
2020-01-22 14:41:51,796 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition dbserver1.inventory.customers-0   [org.apache.kafka.clients.consumer.nternals.ConsumerCoordinator]
2020-01-22 14:41:51,796 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition dbserver1.inventory.products_on_hand-0   [org.apache.kafka.clients.cosumer.internals.ConsumerCoordinator]
2020-01-22 14:41:51,796 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition dbserver1.inventory.addresses-0   [org.apache.kafka.clients.consumer.nternals.ConsumerCoordinator]
2020-01-22 14:41:51,799 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition dbserver1-0 to offset 0.   [org.apache.kafka.clients.consumer.internals.SubscrptionState]
2020-01-22 14:41:51,799 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition dbserver1.inventory.geom-0 to offset 0.   [org.apache.kafka.clients.consumer.iternals.SubscriptionState]
2020-01-22 14:41:51,799 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition dbserver1.inventory.products-0 to offset 0.   [org.apache.kafka.clients.consumr.internals.SubscriptionState]
2020-01-22 14:41:51,799 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition dbserver1.inventory.customers-0 to offset 0.   [org.apache.kafka.clients.consuer.internals.SubscriptionState]
2020-01-22 14:41:51,799 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition dbserver1.inventory.orders-0 to offset 0.   [org.apache.kafka.clients.consumerinternals.SubscriptionState]
2020-01-22 14:41:51,799 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition dbserver1.inventory.products_on_hand-0 to offset 0.   [org.apache.kafka.client.consumer.internals.SubscriptionState]
2020-01-22 14:41:51,799 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition dbserver1.inventory.addresses-0 to offset 0.   [org.apache.kafka.clients.consuer.internals.SubscriptionState]
2020-01-22 14:41:51,800 INFO   ||  Started recovery for topic partition dbserver1-0   [io.confluent.connect.hdfs.TopicPartitionWriter]
2020-01-22 14:41:51,821 INFO   ||  Successfully acquired lease for hdfs://namenode:9000/logs/dbserver1/0/log   [io.confluent.connect.hdfs.wal.FSWAL]
2020-01-22 14:41:51,870 WARN   ||  WorkerSinkTask{id=hdfs-sink-0} Offset commit failed during close   [org.apache.kafka.connect.runtime.WorkerSinkTask]
2020-01-22 14:41:51,870 ERROR  ||  WorkerSinkTask{id=hdfs-sink-0} Commit of offsets threw an unexpected exception for sequence number 1: null   [org.apache.kafka.connect.runtime.WorkerSinkTask]
java.lang.NullPointerException
        at io.confluent.connect.hdfs.DataWriter.getCommittedOffsets(DataWriter.java:520)
        at io.confluent.connect.hdfs.HdfsSinkTask.preCommit(HdfsSinkTask.java:141)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:379)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:591)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
2020-01-22 14:41:51,965 ERROR  ||  WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
java.lang.IllegalArgumentException: Wrong FS: hdfs://172.17.0.7:9000/kafka/dbserver1/partition=0/dbserver1+0+0000000006+0000000008.parquet, expected: hdfs://namenode:9000
        at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:649)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194)
        at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106)
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
        at io.confluent.connect.hdfs.storage.HdfsStorage.exists(HdfsStorage.java:149)
        at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:123)
        at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:649)
        at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:260)
        at io.confluent.connect.hdfs.DataWriter.recover(DataWriter.java:380)
        at io.confluent.connect.hdfs.DataWriter.open(DataWriter.java:444)
        at io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:155)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:587)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:67)
        at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:652)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
2020-01-22 14:41:51,979 ERROR  ||  WorkerSinkTask{id=hdfs-sink-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
2020-01-22 14:41:51,981 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Revoke previously assigned partitions dbserver1-0, dbserver1.inventory.geom-0, dbserver1.inventory.products-0 dbserver1.inventory.customers-0, dbserver1.inventory.orders-0, dbserver1.inventory.products_on_hand-0, dbserver1.inventory.addresses-0   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2020-01-22 14:41:51,982 INFO   ||  [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Member connector-consumer-hdfs-sink-0-423aaedf-7e73-4d83-866d-53f41f440ec5 sending LeaveGroup request to coorinator 172.18.0.10:9092 (id: 2147483646 rack: null) due to the consumer is being closed   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2020-01-22 14:41:52,342 INFO   ||  WorkerSourceTask{id=inventory-connector-0} Committing offsets   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2020-01-22 14:41:52,343 INFO   ||  WorkerSourceTask{id=inventory-connector-0} flushing 0 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]

Я не знаю, что вызывает эту ошибку:

[Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition
[Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition

, а затем эту:

WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
java.lang.IllegalArgumentException: Wrong FS: hdfs://172.17.0.7:9000/kafka/dbserver1/partition=0/dbserver1+0+0000000006+0000000008.parquet, expected: hdfs://namenode:9000

Знаете ли вы, как я могу решить эту проблему?

Все, что вы считаете супер неправильным, является нормальным Я учусь, я рассчитываю на ваши знания в этом, заранее спасибо!

...