Контейнер Namenode:
Core-site:
<configuration>
<property><name>hadoop.proxyuser.hue.hosts</name><value>*</value></property>
<property><name>fs.defaultFS</name><value>hdfs://namenode:9000</value></property>
<property><name>hadoop.http.staticuser.user</name><value>root</value></property>
<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.SnappyCodec</value></property>
<property><name>hadoop.proxyuser.hue.groups</name><value>*</value></property>
</configuration>
Hdfs-site:
<configuration>
<property><name>dfs.namenode.datanode.registration.ip-hostname-check</name><value>false</value></property>
<property><name>dfs.webhdfs.enabled</name><value>true</value></property>
<property><name>dfs.permissions.enabled</name><value>false</value></property>
<property><name>dfs.namenode.name.dir</name><value>file:///hadoop/dfs/name</value></property>
<property><name>dfs.namenode.rpc-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.namenode.servicerpc-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.namenode.http-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.namenode.https-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.client.use.datanode.hostname</name><value>true</value></property>
<property><name>dfs.datanode.use.datanode.hostname</name><value>true</value></property>
</configuration>
Контейнер Kafka Connect:
Plugins:
HTTP/1.1 200 OK
Date: Wed, 22 Jan 2020 15:04:22 GMT
Content-Type: application/json
Content-Length: 1284
Server: Jetty(9.4.20.v20190813)
[{"class":"io.confluent.connect.hdfs.HdfsSinkConnector","type":"sink","version":"5.4.0"},{"class":"io.confluent.connect.hdfs.tools.SchemaSourceConnector","type":"source","version":"2.4.0"},{"class":"io.confluent.connect.storage.tools.SchemaSourceConnector","type":"source","version":"2.4.0"},{"class":"io.debezium.connector.mongodb.MongoDbConnector","type":"source","version":"1.0.0.Final"},{"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.0.0.Final"},{"class":"io.debezium.connector.oracle.OracleConnector","type":"source","version":"1.0.0.Final"},{"class":"io.debezium.connector.postgresql.PostgresConnector","type":"source","version":"1.0.0.Final"},{"class":"io.debezium.connector.sqlserver.SqlServerConnector","type":"source","version":"1.0.0.Final"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.4.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.4.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}][root@debezium docker-hadoop-master]#
Соединитель hdfs-sink:
{"name":"hdfs-sink","config":{"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector","tasks.max":"1","topics":"dbserver1,dbserver1.inventory.products,dbserver1.inventory.products_on_hand,dbserver1.inventory.customrs,dbserver1.inventory.orders, dbserver1.inventory.geom,dbserver1.inventory.addresses","store.url":"hdfs://namenode:9000","flush.size":"3","logs.dir":"logs","topics.dir":"kafka","format.class":"io.confluent.connect.hdfs.sting.StringFormat","partitioner.class":"io.confluent.connect.hdfs.partitioner.DefaultPartitioner","partition.field.name":"day","request.timeout.ms":"310000","heartbeat.interval.ms":"60000","consumer.session.timeout.ms":"30000","max.poll.records":"10000","hadoop.conf.dir":"/opt/hadoop-3.1.3/etc/hadoop","offsets.retention.minutes":"1440","name":"hdfs-sink"},"tasks":[{"connector":"hdfs-sink","task":0}],"type":"sink"}[root@debezium docker-hadoopmaster]
Среды:
KAFKA_HOME=/kafka
HOSTNAME=20a482382ba9
CONFIG_STORAGE_TOPIC=my_connect_configs
TERM=xterm
HISTSIZE=1000
AVRO_JACKSON_VERSION=1.9.13
SCALA_VERSION=2.12
CORE_CONF_fs_defaultFS=hdfs://namenode:9000
KAFKA_CONNECT_PLUGINS_DIR=/kafka/connect
USER=root
LS_COLORS=rs=0:di=01;34:ln=01;36:mh=00:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=40;31;01:mi=01;05;37;41:su=37;41:sg=30;43:ca=30;41:tw=30;42:ow=34;42:st=37;44:ex=01;32:*.tar=01;31:*.tgz=01;31:*.arc=01;31:*.arj=1;31:*.taz=01;31:*.lha=01;31:*.lz4=01;31:*.lzh=01;31:*.lzma=01;31:*.tlz=01;31:*.txz=01;31:*.tzo=01;31:*.t7z=01;31:*.zip=01;31:*.z=01;31:*.Z=01;31:*.dz=01;31:*.gz=01;31:*.lrz=01;31:*.lz=01;31:*.lzo=01;31:*.xz=01;31:*.bz2=0131:*.bz=01;31:*.tbz=01;31:*.tbz2=01;31:*.tz=01;31:*.deb=01;31:*.rpm=01;31:*.jar=01;31:*.war=01;31:*.ear=01;31:*.sar=01;31:*.rar=01;31:*.alz=01;31:*.ace=01;31:*.zoo=01;31:*.cpio=01;31:*.7z=01;31:*.rz=01;31:*.cab=01;31:*.jpg01;35:*.jpeg=01;35:*.gif=01;35:*.bmp=01;35:*.pbm=01;35:*.pgm=01;35:*.ppm=01;35:*.tga=01;35:*.xbm=01;35:*.xpm=01;35:*.tif=01;35:*.tiff=01;35:*.png=01;35:*.svg=01;35:*.svgz=01;35:*.mng=01;35:*.pcx=01;35:*.mov=01;35:*.mpg=01;5:*.mpeg=01;35:*.m2v=01;35:*.mkv=01;35:*.webm=01;35:*.ogm=01;35:*.mp4=01;35:*.m4v=01;35:*.mp4v=01;35:*.vob=01;35:*.qt=01;35:*.nuv=01;35:*.wmv=01;35:*.asf=01;35:*.rm=01;35:*.rmvb=01;35:*.flc=01;35:*.avi=01;35:*.fli=01;35:*.lv=01;35:*.gl=01;35:*.dl=01;35:*.xcf=01;35:*.xwd=01;35:*.yuv=01;35:*.cgm=01;35:*.emf=01;35:*.axv=01;35:*.anx=01;35:*.ogv=01;35:*.ogx=01;35:*.aac=01;36:*.au=01;36:*.flac=01;36:*.mid=01;36:*.midi=01;36:*.mka=01;36:*.mp3=01;3:*.mpc=01;36:*.ogg=01;36:*.ra=01;36:*.wav=01;36:*.axa=01;36:*.oga=01;36:*.spx=01;36:*.xspf=01;36:
POSTGRES_MD5=955675a36c7af5c2cf7f9f85d4abac3b
MAVEN_REPO_INCUBATOR=
MYSQL_MD5=41820a2e872676d4e9acaeb3cb2b2c13
CONNECT_PLUGIN_PATH=
MAIL=/var/spool/mail/root
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
KAFKA_VERSION=2.4.0
PWD=/kafka
MAVEN_REPO_CENTRAL=
JAVA_HOME=/etc/alternatives/jre
ORACLE_MD5=6557bb454dcef2deb4aeb4e148cc313f
GROUP_ID=1
HADOOP_CONF_DIR=/opt/hadoop-3.1.3/etc/hadoop
DEBEZIUM_VERSION=1.0.0.Final
HISTCONTROL=ignoredups
JAVA_MAJOR_VERSION=11
BOOTSTRAP_SERVERS=172.18.0.10:9092
SQLSERVER_MD5=f86d8d403a3265cb4f550ed5ddc457d7
SHLVL=1
HOME=/root
JAVA_APP_DIR=/deployments
STATUS_STORAGE_TOPIC=my_connect_statuses
LOGNAME=root
MONGODB_MD5=921d69952d2277b9664c69d2e4fbac9c
SHA512HASH=53B52F86EA56C9FAC62046524F03F75665A089EA2DAE554AEFE3A3D2694F2DA88B5BA8725D8BE55F198BA80695443559ED9DE7C0B2A2817F7A6141008FF79F49
CONFLUENT_VERSION=5.1.2
MAVEN_DEP_DESTINATION=/kafka/connect
OFFSET_STORAGE_TOPIC=my_connect_offsets
AVRO_VERSION=1.8.2
KAFKA_URL_PATH=kafka/2.4.0/kafka_2.12-2.4.0.tgz
Log Kafka Connect:
2020-01-22 14:41:51,716 INFO || EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = hdfs-sink
tasks.max = 1
topics = [dbserver1, dbserver1.inventory.products, dbserver1.inventory.products_on_hand, dbserver1.inventory.customers, dbserver1.inventory.orders, dbserver1.inventory.geom, dbserver1.inventory.addresses]
topics.regex =
transforms = []
value.converter = null
[org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2020-01-22 14:41:51,716 INFO || ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [172.18.0.10:9092]
check.crcs = true
client.dns.lookup = default
client.id = connector-consumer-hdfs-sink-0
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-hdfs-sink
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
[org.apache.kafka.clients.consumer.ConsumerConfig]
2020-01-22 14:41:51,718 INFO || Kafka version: 2.4.0 [org.apache.kafka.common.utils.AppInfoParser]
2020-01-22 14:41:51,718 INFO || Kafka commitId: 77a89fcf8d7fa018 [org.apache.kafka.common.utils.AppInfoParser]
2020-01-22 14:41:51,718 INFO || Kafka startTimeMs: 1579704111718 [org.apache.kafka.common.utils.AppInfoParser]
2020-01-22 14:41:51,719 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-01-22 14:41:51,722 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Subscribed to topic(s): dbserver1, dbserver1.inventory.products, dbserver1.inventory.products_on_hand, dbservr1.inventory.customers, dbserver1.inventory.orders, dbserver1.inventory.geom, dbserver1.inventory.addresses [org.apache.kafka.clients.consumer.KafkaConsumer]
2020-01-22 14:41:51,723 INFO || HdfsSinkConnectorConfig values:
avro.codec = null
connect.hdfs.keytab =
connect.hdfs.principal =
connect.meta.data = true
enhanced.avro.schema.support = false
filename.offset.zero.pad.width = 10
flush.size = 3
format.class = class io.confluent.connect.hdfs.string.StringFormat
hadoop.conf.dir = /opt/hadoop-3.1.3/etc/hadoop
hadoop.home =
hdfs.authentication.kerberos = false
hdfs.namenode.principal =
hdfs.url = null
kerberos.ticket.renew.period.ms = 3600000
logs.dir = logs
retry.backoff.ms = 5000
rotate.interval.ms = -1
rotate.schedule.interval.ms = -1
schema.cache.size = 1000
schema.compatibility = NONE
shutdown.timeout.ms = 3000
[io.confluent.connect.hdfs.HdfsSinkConnectorConfig]
2020-01-22 14:41:51,723 INFO || StorageCommonConfig values:
directory.delim = /
file.delim = +
storage.class = class io.confluent.connect.hdfs.storage.HdfsStorage
store.url = hdfs://namenode:9000
topics.dir = kafka
[io.confluent.connect.storage.common.StorageCommonConfig]
2020-01-22 14:41:51,723 INFO || HiveConfig values:
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
[io.confluent.connect.storage.hive.HiveConfig]
2020-01-22 14:41:51,723 INFO || PartitionerConfig values:
locale =
partition.duration.ms = -1
partition.field.name = [day]
partitioner.class = class io.confluent.connect.hdfs.partitioner.DefaultPartitioner
path.format =
timestamp.extractor = Wallclock
timestamp.field = timestamp
timezone =
[io.confluent.connect.storage.partitioner.PartitionerConfig]
2020-01-22 14:41:51,723 INFO || AvroDataConfig values:
connect.meta.data = true
enhanced.avro.schema.support = false
schemas.cache.config = 1000
[io.confluent.connect.avro.AvroDataConfig]
2020-01-22 14:41:51,723 INFO || Hadoop configuration directory /opt/hadoop-3.1.3/etc/hadoop [io.confluent.connect.hdfs.DataWriter]
2020-01-22 14:41:51,776 INFO || The connector relies on offsets in HDFS filenames, but does commit these offsets to Connect to enable monitoring progress of the HDFS connector. Upon startup, the HDFS Connector restores ffsets from filenames in HDFS. In the absence of files in HDFS, the connector will attempt to find offsets for its consumer group in the '__consumer_offsets' topic. If offsets are not found, the consumer will rely on the rset policy specified in the 'consumer.auto.offset.reset' property to start exporting data to HDFS. [io.confluent.connect.hdfs.HdfsSinkTask]
2020-01-22 14:41:51,776 INFO || WorkerSinkTask{id=hdfs-sink-0} Sink task finished initialization and start [org.apache.kafka.connect.runtime.WorkerSinkTask]
2020-01-22 14:41:51,782 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Cluster ID: 0TvMS3DVTIur7n7N3uOQpw [org.apache.kafka.clients.Metadata]
2020-01-22 14:41:51,782 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Discovered group coordinator 172.18.0.10:9092 (id: 2147483646 rack: null) [org.apache.kafka.clients.consume.internals.AbstractCoordinator]
2020-01-22 14:41:51,789 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] (Re-)joining group [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2020-01-22 14:41:51,791 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] (Re-)joining group [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2020-01-22 14:41:51,793 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Finished assignment for group at generation 3: {connector-consumer-hdfs-sink-0-423aaedf-7e73-4d83-866d-53f41f40ec5=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@7d32b3} [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2020-01-22 14:41:51,795 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Successfully joined group with generation 3 [org.apache.kafka.clients.consumer.internals.AbstractCoordinato]
2020-01-22 14:41:51,795 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Adding newly assigned partitions: dbserver1-0, dbserver1.inventory.geom-0, dbserver1.inventory.products-0, dberver1.inventory.customers-0, dbserver1.inventory.orders-0, dbserver1.inventory.products_on_hand-0, dbserver1.inventory.addresses-0 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2020-01-22 14:41:51,796 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition dbserver1-0 [org.apache.kafka.clients.consumer.internals.ConsumerCordinator]
2020-01-22 14:41:51,796 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition dbserver1.inventory.geom-0 [org.apache.kafka.clients.consumer.interals.ConsumerCoordinator]
2020-01-22 14:41:51,796 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition dbserver1.inventory.products-0 [org.apache.kafka.clients.consumer.iternals.ConsumerCoordinator]
2020-01-22 14:41:51,796 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition dbserver1.inventory.orders-0 [org.apache.kafka.clients.consumer.intrnals.ConsumerCoordinator]
2020-01-22 14:41:51,796 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition dbserver1.inventory.customers-0 [org.apache.kafka.clients.consumer.nternals.ConsumerCoordinator]
2020-01-22 14:41:51,796 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition dbserver1.inventory.products_on_hand-0 [org.apache.kafka.clients.cosumer.internals.ConsumerCoordinator]
2020-01-22 14:41:51,796 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition dbserver1.inventory.addresses-0 [org.apache.kafka.clients.consumer.nternals.ConsumerCoordinator]
2020-01-22 14:41:51,799 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition dbserver1-0 to offset 0. [org.apache.kafka.clients.consumer.internals.SubscrptionState]
2020-01-22 14:41:51,799 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition dbserver1.inventory.geom-0 to offset 0. [org.apache.kafka.clients.consumer.iternals.SubscriptionState]
2020-01-22 14:41:51,799 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition dbserver1.inventory.products-0 to offset 0. [org.apache.kafka.clients.consumr.internals.SubscriptionState]
2020-01-22 14:41:51,799 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition dbserver1.inventory.customers-0 to offset 0. [org.apache.kafka.clients.consuer.internals.SubscriptionState]
2020-01-22 14:41:51,799 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition dbserver1.inventory.orders-0 to offset 0. [org.apache.kafka.clients.consumerinternals.SubscriptionState]
2020-01-22 14:41:51,799 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition dbserver1.inventory.products_on_hand-0 to offset 0. [org.apache.kafka.client.consumer.internals.SubscriptionState]
2020-01-22 14:41:51,799 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition dbserver1.inventory.addresses-0 to offset 0. [org.apache.kafka.clients.consuer.internals.SubscriptionState]
2020-01-22 14:41:51,800 INFO || Started recovery for topic partition dbserver1-0 [io.confluent.connect.hdfs.TopicPartitionWriter]
2020-01-22 14:41:51,821 INFO || Successfully acquired lease for hdfs://namenode:9000/logs/dbserver1/0/log [io.confluent.connect.hdfs.wal.FSWAL]
2020-01-22 14:41:51,870 WARN || WorkerSinkTask{id=hdfs-sink-0} Offset commit failed during close [org.apache.kafka.connect.runtime.WorkerSinkTask]
2020-01-22 14:41:51,870 ERROR || WorkerSinkTask{id=hdfs-sink-0} Commit of offsets threw an unexpected exception for sequence number 1: null [org.apache.kafka.connect.runtime.WorkerSinkTask]
java.lang.NullPointerException
at io.confluent.connect.hdfs.DataWriter.getCommittedOffsets(DataWriter.java:520)
at io.confluent.connect.hdfs.HdfsSinkTask.preCommit(HdfsSinkTask.java:141)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:379)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:591)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2020-01-22 14:41:51,965 ERROR || WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask]
java.lang.IllegalArgumentException: Wrong FS: hdfs://172.17.0.7:9000/kafka/dbserver1/partition=0/dbserver1+0+0000000006+0000000008.parquet, expected: hdfs://namenode:9000
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:649)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
at io.confluent.connect.hdfs.storage.HdfsStorage.exists(HdfsStorage.java:149)
at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:123)
at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:649)
at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:260)
at io.confluent.connect.hdfs.DataWriter.recover(DataWriter.java:380)
at io.confluent.connect.hdfs.DataWriter.open(DataWriter.java:444)
at io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:67)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:652)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2020-01-22 14:41:51,979 ERROR || WorkerSinkTask{id=hdfs-sink-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
2020-01-22 14:41:51,981 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Revoke previously assigned partitions dbserver1-0, dbserver1.inventory.geom-0, dbserver1.inventory.products-0 dbserver1.inventory.customers-0, dbserver1.inventory.orders-0, dbserver1.inventory.products_on_hand-0, dbserver1.inventory.addresses-0 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2020-01-22 14:41:51,982 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Member connector-consumer-hdfs-sink-0-423aaedf-7e73-4d83-866d-53f41f440ec5 sending LeaveGroup request to coorinator 172.18.0.10:9092 (id: 2147483646 rack: null) due to the consumer is being closed [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2020-01-22 14:41:52,342 INFO || WorkerSourceTask{id=inventory-connector-0} Committing offsets [org.apache.kafka.connect.runtime.WorkerSourceTask]
2020-01-22 14:41:52,343 INFO || WorkerSourceTask{id=inventory-connector-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
Я не знаю, что вызывает эту ошибку:
[Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Found no committed offset for partition
[Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Resetting offset for partition
, а затем эту:
WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask]
java.lang.IllegalArgumentException: Wrong FS: hdfs://172.17.0.7:9000/kafka/dbserver1/partition=0/dbserver1+0+0000000006+0000000008.parquet, expected: hdfs://namenode:9000
Знаете ли вы, как я могу решить эту проблему?
Все, что вы считаете супер неправильным, является нормальным Я учусь, я рассчитываю на ваши знания в этом, заранее спасибо!