Мы используем kafka connect в HDP 2.6.4, сливной соединитель и реестр схем hortonwork. kafka connect Kerberised, запись в удаленный кластер, распространение. По моему мнению, он может читать данные из темы, но не записывает их обратно в HDFS. у нас есть 2 разъема с одинаковой конфигурацией, и один работает без проблем (но разные темы, назначение HDFS / Hive, пользователь и безопасность).
connector.json:
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"topics.dir": "/data/...",
"flush.size": "44000",
"tasks.max": "3",
"connect.hdfs.principal": "principal",
"hive.database": "hive_DB",
"rotate.interval.ms": "60000",
"logs.dir": "/tmp/kafka_connect/...",
"format.class": "custom avro",
"hive.integration": "true",
"ssl.key.password": "password",
"hive.conf.dir": "/path/to/hive/conf",
"ssl.truststore.password": "password",
"hadoop.conf.dir": "/path/to/hadoop/conf",
"schema.compatibility": "BACKWARD",
"topics": "topic",
"connect.hdfs.keytab": "/path/to/keytab",
"hdfs.url": "hdfs://ha-name",
"ssl.keystore.location": "/path/to/keystore.jks",
"hdfs.authentication.kerberos": "true",
"hive.metastore.uris": "thrift://server:9083",
"security.protocol": "SASL_SSL",
"ssl.truststore.location": "/path/to/truststore.jks",
"ssl.keystore.password": "password"
kafka connectработает:
в рейнджере, он может использовать, опубликовать 3 раздела конфигурации: connect-offset, connect-status, connect-config, а также целевой раздел. у нас нет никаких отрицаемых в рейнджере. разрешено писать и читать в журнале HDFS logs.dir, themes.dir ...
это один из наших графических панелей о потребителе kafka connect:
мы также можем сравнить смещение group.id:
день n: группа 1: ток: 27121592 конец: 27121592 лаг: 0 группа 2: ток: 27132622 конец: 27132622 лаг: 0
день n + 1: группа 1: ток: 27700858 конец: 27700858 лаг: 0 группа 2: ток: 27711406 конец: 27711406 лаг: 0
в файле журнала я вижу проблему с 2 потенциалами":
DEBUG Failed to detect a valid hadoop home directory (org.apache.hadoop.util.Shell)
java.io.IOException: Hadoop home directory does not exist, is not a directory, or is not an absolute path.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:312)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:327)
at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:116)
at org.apache.hadoop.security.Groups.<init>(Groups.java:93)
at org.apache.hadoop.security.Groups.<init>(Groups.java:73)
at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:293)
at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:283)
at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:311)
at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:126)
at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:76)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:221)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
оба наших разъема указывают на один и тот же интерфейс hadoop, но другой работает нормально.
и:
DEBUG Exception while invoking renewLease of class ClientNamenodeProtocolTranslatorPB over XXXX:8020. Retrying after sleeping for 967ms. (org.apache.hadoop.io.retry.RetryInvocationHandler)
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.RetriableException): Server is too busy.
at org.apache.hadoop.ipc.Client.call(Client.java:1469)
at org.apache.hadoop.ipc.Client.call(Client.java:1400)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy53.renewLease(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewLease(ClientNamenodeProtocolTranslatorPB.java:571)
at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
, но это не проблемана мой взгляд
он мог работать долгое время, но с некоторого дня он не может перезапуститься, поэтому мы довольно уверены в конфигурации.