Доступ к хранилищу блобов Azure - PullRequest
0 голосов
/ 17 декабря 2018

ОБНОВЛЕНИЕ Я использовал клиент для загрузки файлов в существующий BLOB-объект.Первый файл успешно загружен, но программа выведет те же сообщения отладки SslHandler о Handshaking.Для меня это означает, что клиент может взаимодействовать с Azure, выполнять операции, но не может получить ответ.В моем коде я использую метод blockingGet () для ожидания ответа от Azure. Очевидно, что-то блокирует Azure от отправки ответа вызывающей стороне.Я получаю это исключение:

Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.TimeoutException: The source did not signal an event for 60 seconds and has been terminated.
    at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
    at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:93)
    at io.reactivex.Single.blockingGet(Single.java:2700)
    at azure.AzureBlockBlobClient.createContainer(AzureBlockBlobClient.java:68)
    at spark.AzureExample.<init>(AzureExample.java:18)
    at spark.AzureExample.main(AzureExample.java:28)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)

Я написал код, который подключается к моей учетной записи хранения BLOB-объектов Azure и создает контейнер.

Если я запускаю его как основной класс напрямуюиз фляги или просто запустив файл класса из моей Intellij IDE, все работает.Если я запускаю его из команды spark-submit, передавая класс для запуска и jar, он не работает.Я получаю следующее исключение:

2108 [nioEventLoopGroup-1-1] DEBUG io.netty.handler.ssl.SslHandler  - [id: 0xa1cbec16, L:/10.19.117.127:56010 - R:storageaccount.blob.core.windows.net/20.138.98.132:443] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
65718 [nioEventLoopGroup-1-3] DEBUG io.netty.handler.ssl.SslHandler  - [id: 0xdd8a9177, L:/10.19.117.127:56019 - R:storageaccount.blob.core.windows.net/20.138.98.132:443] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
137718 [nioEventLoopGroup-1-5] DEBUG io.netty.handler.ssl.SslHandler  - [id: 0xffca3cfa, L:/10.19.117.127:56022 - R:storageaccount.blob.core.windows.net/20.138.98.132:443] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
225794 [nioEventLoopGroup-1-7] DEBUG io.netty.handler.ssl.SslHandler  - [id: 0xb957ab94, L:/10.19.117.127:56030 - R:storageaccount.blob.core.windows.net/20.138.98.132:443] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256

Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.TimeoutException: The source did not signal an event for 60 seconds and has been terminated.
    at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
    at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:93)
    at io.reactivex.Single.blockingGet(Single.java:2700)
    at AzureBlockBlobClient.createContainer(AzureBlockBlobClient.java:58)
    at DataExtractor.<init>(DataExtractor.java:69)
    at DataExtractor.main(DataExtractor.java:355)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Когда это работает, он просто печатает строку, похожую на эту строку, а затем продолжает выполнение:

2108 [nioEventLoopGroup-1-1] DEBUG io.netty.handler.ssl.SslHandler  - [id: 0xa1cbec16, L:/10.19.117.127:56010 - R:storageaccount.blob.core.windows.net/20.138.98.132:443] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256

Когда я запускаю его на свече, онпечатает эти 4 строки из SslHandler - минута или около того ожидания между каждой строкой - и затем выдает исключение.

2108 [nioEventLoopGroup-1-1] DEBUG io.netty.handler.ssl.SslHandler  - [id: 0xa1cbec16, L:/10.19.117.127:56010 - R:storageaccount.blob.core.windows.net/20.138.98.132:443] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
65718 [nioEventLoopGroup-1-3] DEBUG io.netty.handler.ssl.SslHandler  - [id: 0xdd8a9177, L:/10.19.117.127:56019 - R:storageaccount.blob.core.windows.net/20.138.98.132:443] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
137718 [nioEventLoopGroup-1-5] DEBUG io.netty.handler.ssl.SslHandler  - [id: 0xffca3cfa, L:/10.19.117.127:56022 - R:storageaccount.blob.core.windows.net/20.138.98.132:443] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
225794 [nioEventLoopGroup-1-7] DEBUG io.netty.handler.ssl.SslHandler  - [id: 0xb957ab94, L:/10.19.117.127:56030 - R:storageaccount.blob.core.windows.net/20.138.98.132:443] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256

Вот некоторые фрагменты моего кода:

Это кодмоего клиента, который подключается к Azure:

public AzureBlockBlobClient(String accountName, String accountKey, String containerName) {
    this.accountName = accountName;
    this.accountKey = accountKey;
    this.containerName = containerName;
    init();
}

private void init() {
    log.info("Init AzureBlockBlobClient started...");
    try {
        SharedKeyCredentials creds = new SharedKeyCredentials(accountName, accountKey);
        serviceURL = new ServiceURL(new URL("https://" + accountName + ".blob.core.windows.net/"),
            StorageURL.createPipeline(creds, new PipelineOptions()));
        containerURL = serviceURL.createContainerURL(containerName);
    }catch (InvalidKeyException e){
        log.error("Authentication error while trying to access storage account", e);
    }catch (MalformedURLException e) {
        log.error("Invalid Service URL", e);
        e.printStackTrace();
    }catch (Exception e) {
        e.printStackTrace();
        log.error("Error initializing AzureBlockBlobClient", e);
    }

    log.info("Init AzureBlockBlobClient Done!");
}

public void createContainer(){
    try {
        // Let's create a container using a blocking call to Azure Storage
        // If container exists, we'll catch and continue
        log.info("Creating container {}." , containerName);
        ContainerCreateResponse response = containerURL.create(null, null, null).blockingGet();
        log.info("Container Create Response was {}." , response.statusCode());
    }
    catch (RestException e){
        if (e instanceof RestException && e.response().statusCode() != 409) {
            log.error("Error Creating container", e);
        } else {
            log.info("Container {} already exists, resuming...", containerName);
        }
    }
}

Вот где я на самом деле звоню, чтобы создать контейнер:

private AzureBlockBlobClient azureBlockBlobClient;

public AzureExample() {
    azureBlockBlobClient = new AzureBlockBlobClient(AzureConf.ACCOUNT_NAME,AzureConf.ACCOUNT_KEY, AzureConf.CONTAINER_NAME);
    azureBlockBlobClient.createContainer();
}

public static void main(String... args) throws IOException {
    new AzureExample();
    System.exit(0);
}

И вот где мои константы:

public interface AzureConf {
String ACCOUNT_KEY ="<SomeAccountKey>";
String ACCOUNT_NAME = "storage";
String CONTAINER_NAME = "My-container";
}

Это мой файл maven pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>examples</groupId>
<artifactId>spark-azure-storage</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
    <junit.version>4.12</junit.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-azure</artifactId>
        <version>2.7.1</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.microsoft.azure</groupId>
        <artifactId>azure-storage</artifactId>
        <version>2.0.0</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>com.microsoft.azure</groupId>
        <artifactId>azure-storage-blob</artifactId>
        <version>10.1.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>${junit.version}</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.2.3</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.16</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor -->


    <dependency>
        <groupId>com.microsoft.rest.v2</groupId>
        <artifactId>client-runtime</artifactId>
        <version>2.0.0</version>
        <!--I have to exclude following dependencies and include version 2.9.7 of them otherwise I get
        SoSuchMethodError-->
        <exclusions>
            <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-annotations</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
            </exclusion>
            <exclusion>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.16</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.9.7</version>
    </dependency>
    <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
    <version>2.9.7</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.9.7</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.1</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>exec-maven-plugin</artifactId>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <filters>
                    <filter>
                        <artifact>*:*</artifact>
                        <excludes>
                            <exclude>META-INF/*.SF</exclude>
                            <exclude>META-INF/*.DSA</exclude>
                            <exclude>META-INF/*.RSA</exclude>
                        </excludes>
                    </filter>
                </filters>
                <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>reference.conf</resource>
                    </transformer>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"></transformer>
                </transformers>
            </configuration>
        </plugin>
    </plugins>
</build>

Есть идеи, почему это не работает?Это рукопожатие между netty и Azure, которое не идет в обоих направлениях?

...