Я пытаюсь подключиться к Azure Data Explorer, используя Confluent Kafka Sink Connector.https://github.com/Azure/kafka-sink-azure-kusto Когда записи добавляются в источник (MsSql), он не может отправить данные в приемник.
Экземпляр Aws Ec2 имеет встроенные док-контейнеры для Kafka-connect / Zookeeper / control-center и т. Д. Кувшин для коннектора раковины kafka был помещен в папку плагинов.Docker-compose обеспечивает доступность этого коннектора для док-контейнера kafka-connect через тома.С помощью пользовательского интерфейса центра управления подключаются разъем источника MsSql и разъем приемника Kusto.Когда записи добавляются в MsSql, выдается следующее исключение -
[2019-06-06 00:19:02,140] INFO Refreshing Ingestion Resources (com.microsoft.azure.kusto.ingest.ResourceManager)
[2019-06-06 00:19:02,140] ERROR ingestFromFile: Error ingesting local file: /var/tmp/sinklog/kafka_kafkatest1_0_0.gz (com.microsoft.azure.kusto.ingest.IngestClientImpl)
java.lang.IllegalArgumentException: username is null or empty
at com.microsoft.aad.adal4j.AuthenticationContext.acquireToken(AuthenticationContext.java:205)
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireAadUserAccessToken(AadAuthenticationHelper.java:55)
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireAccessToken(AadAuthenticationHelper.java:43)
at com.microsoft.azure.kusto.data.KustoClient.execute(KustoClient.java:34)
at com.microsoft.azure.kusto.data.KustoClient.execute(KustoClient.java:30)
at com.microsoft.azure.kusto.data.KustoClient.execute(KustoClient.java:20)
at com.microsoft.azure.kusto.ingest.ResourceManager.refreshIngestionResources(ResourceManager.java:127)
at com.microsoft.azure.kusto.ingest.ResourceManager.getIngestionResource(ResourceManager.java:104)
at com.microsoft.azure.kusto.ingest.IngestClientImpl.ingestFromFile(IngestClientImpl.java:109)
at com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter.handleRollFile(TopicPartitionWriter.java:43)
at com.microsoft.azure.kusto.kafka.connect.sink.GZIPFileWriter.finishFile(GZIPFileWriter.java:100)
at com.microsoft.azure.kusto.kafka.connect.sink.GZIPFileWriter.rotate(GZIPFileWriter.java:93)
at com.microsoft.azure.kusto.kafka.connect.sink.GZIPFileWriter.write(GZIPFileWriter.java:57)
at com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:79)
at com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkTask.put(KustoSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Конфигурация приемника выглядит следующим образом:
{
"name": "KustoSink1",
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"config.action.reload": "restart",
"errors.retry.timeout": "0",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": "kafkatest1",
"errors.deadletterqueue.topic.name": "",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.deadletterqueue.context.headers.enable": "false",
"kusto.url": "https://ingest-xxxx.kusto.windows.net",
"kusto.db": "kafkaadx01",
"kusto.table": "kafkatest",
"kusto.tables.topics_mapping": "[{'topic': 'kafkatest1','db': 'kafkaadx01', 'table': 'kafkatest','format': 'json', 'mapping':'TestMapping'}]",
"kusto.auth.username": "xxx",
"kusto.auth.password": "xxxx",
"kusto.auth.appid": "",
"kusto.auth.appkey": "",
"kusto.auth.authority": "XXXX",
"kusto.sink.tempdir": "/var/tmp/sinklog",
"kusto.sink.flush_size": "1024",
"kusto.sink.flush_interval_ms": "1000"
}
Я выполнил программу на Python из той же среды, используяте же учетные данные, и это сработало (это была операция чтения из программы python).Таким образом, учетные данные выглядят хорошо, и проблема с брандмауэром тоже не возникает.
Пожалуйста, помогите мне в решении этой проблемы.