Соединитель приемника Azure Kafka не может подключиться к обозревателю данных Azure - PullRequest
0 голосов
/ 06 июня 2019

Я пытаюсь подключиться к Azure Data Explorer, используя Confluent Kafka Sink Connector.https://github.com/Azure/kafka-sink-azure-kusto Когда записи добавляются в источник (MsSql), он не может отправить данные в приемник.

Экземпляр Aws Ec2 имеет встроенные док-контейнеры для Kafka-connect / Zookeeper / control-center и т. Д. Кувшин для коннектора раковины kafka был помещен в папку плагинов.Docker-compose обеспечивает доступность этого коннектора для док-контейнера kafka-connect через тома.С помощью пользовательского интерфейса центра управления подключаются разъем источника MsSql и разъем приемника Kusto.Когда записи добавляются в MsSql, выдается следующее исключение -

[2019-06-06 00:19:02,140] INFO Refreshing Ingestion Resources (com.microsoft.azure.kusto.ingest.ResourceManager)
[2019-06-06 00:19:02,140] ERROR ingestFromFile: Error ingesting local file: /var/tmp/sinklog/kafka_kafkatest1_0_0.gz (com.microsoft.azure.kusto.ingest.IngestClientImpl)
java.lang.IllegalArgumentException: username is null or empty
    at com.microsoft.aad.adal4j.AuthenticationContext.acquireToken(AuthenticationContext.java:205)
    at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireAadUserAccessToken(AadAuthenticationHelper.java:55)
    at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireAccessToken(AadAuthenticationHelper.java:43)
    at com.microsoft.azure.kusto.data.KustoClient.execute(KustoClient.java:34)
    at com.microsoft.azure.kusto.data.KustoClient.execute(KustoClient.java:30)
    at com.microsoft.azure.kusto.data.KustoClient.execute(KustoClient.java:20)
    at com.microsoft.azure.kusto.ingest.ResourceManager.refreshIngestionResources(ResourceManager.java:127)
    at com.microsoft.azure.kusto.ingest.ResourceManager.getIngestionResource(ResourceManager.java:104)
    at com.microsoft.azure.kusto.ingest.IngestClientImpl.ingestFromFile(IngestClientImpl.java:109)
    at com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter.handleRollFile(TopicPartitionWriter.java:43)
    at com.microsoft.azure.kusto.kafka.connect.sink.GZIPFileWriter.finishFile(GZIPFileWriter.java:100)
    at com.microsoft.azure.kusto.kafka.connect.sink.GZIPFileWriter.rotate(GZIPFileWriter.java:93)
    at com.microsoft.azure.kusto.kafka.connect.sink.GZIPFileWriter.write(GZIPFileWriter.java:57)
    at com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:79)
    at com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkTask.put(KustoSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Конфигурация приемника выглядит следующим образом:

{
  "name": "KustoSink1",
  "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
  "tasks.max": "1",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  "config.action.reload": "restart",
  "errors.retry.timeout": "0",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topics": "kafkatest1",
  "errors.deadletterqueue.topic.name": "",
  "errors.deadletterqueue.topic.replication.factor": "3",
  "errors.deadletterqueue.context.headers.enable": "false",
  "kusto.url": "https://ingest-xxxx.kusto.windows.net",
  "kusto.db": "kafkaadx01",
  "kusto.table": "kafkatest",
  "kusto.tables.topics_mapping": "[{'topic': 'kafkatest1','db': 'kafkaadx01', 'table': 'kafkatest','format': 'json', 'mapping':'TestMapping'}]",
  "kusto.auth.username": "xxx",
  "kusto.auth.password": "xxxx",
  "kusto.auth.appid": "",
  "kusto.auth.appkey": "",
  "kusto.auth.authority": "XXXX",
  "kusto.sink.tempdir": "/var/tmp/sinklog",
  "kusto.sink.flush_size": "1024",
  "kusto.sink.flush_interval_ms": "1000"
}

Я выполнил программу на Python из той же среды, используяте же учетные данные, и это сработало (это была операция чтения из программы python).Таким образом, учетные данные выглядят хорошо, и проблема с брандмауэром тоже не возникает.

Пожалуйста, помогите мне в решении этой проблемы.

...