Как использовать google-cloud-storage напрямую в проекте Apache Beam - PullRequest
0 голосов
/ 25 апреля 2018

Мы работаем над проектом Apache Beam (версия 2.4.0), где мы также хотим работать с корзиной напрямую через API google-cloud-storage.Однако объединение некоторых зависимостей луча с облачным хранилищем приводит к трудному решению проблемы зависимостей.

Мы увидели, что Beam 2.4.0 зависит от облачного хранилища 1.22.0, поэтому мы его и используемниже.У нас были те же проблемы с 1.27.0.В следующем файле pom.xml указываются четыре зависимости луча, которые мы используем в нашем проекте, последние две из которых приводят к проблемам.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bol</groupId>
    <artifactId>beam-plus-storage</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <beam.version>2.4.0</beam.version>
    </properties>

    <dependencies>
        <!-- These first two dependencies do not clash -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-extensions-join-library</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <!-- This one leads to java.lang.ClassNotFoundException: com.google.api.gax.rpc.HeaderProvider -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <!-- This one leads to java.lang.NoSuchMethodError: com.google.api.services.storage.Storage$Objects$List.setUserProject(...) -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
            <version>${beam.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-storage</artifactId>
            <version>1.22.0</version>
        </dependency>

    </dependencies>
</project>

Ниже приведено минимальное рабочее / неработающее использование API хранилища со списком файлов изобщедоступное ведро.

import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CloudStorageReader {

    public static void main(String[] args) {
        Storage storage = StorageOptions.getDefaultInstance().getService();
        Page<Blob> list = storage.list("gcp-public-data-landsat", Storage.BlobListOption.currentDirectory(), Storage.BlobListOption.prefix("LC08/PRE/044/034/LC80440342016259LGN00/"));
        for (Blob blob : list.getValues()) {
            System.out.println(blob);
        }
    }
}

При удалении двух последних зависимостей перечисление содержимого ведра работает нормально.С зависимостью луча java-io класс HeaderProvider не найден.С зависимостью потока данных метод setUserProject не найден.См. Комментарии в pom для полных сообщений об ошибках.

Мы потратили довольно много времени, пытаясь исправить ошибку HeaderProvider, которая появляется, когда импортируются все четыре зависимости луча.Мы добавили явный импорт для конфликтующих зависимостей, добавив исключение также и для импорта балок.Каждый раз, когда мы добавляли явную зависимость, возникала еще одна связанная проблема.Мы предприняли попытку затенения maven, что не очень практично из-за упаковки нашего проекта, поэтому так и не получили его.

В конце мы прибегли к созданию отдельного подмодуля + jar для взаимодействия облачного хранилища., усложняя нашу упаковку / запуск.

В заключение отметим, что у нас возникла та же проблема при попытке использовать API BigQuery, но мы решили эту проблему, повторно использовав закрытый для пакета код луча.

Было бы замечательно, если бы у кого-нибудь был (относительно простой) способ заставить эти библиотеки работать вместе, или он мог бы подтвердить, что это действительно сложная проблема зависимости, которую, возможно, придется решать в Apache Beam.

1 Ответ

0 голосов
/ 26 апреля 2018

Вместо включения отдельной зависимости для облачного хранилища, вы можете использовать включенный в Beam API FileSystems для отображения списков, чтения / записи файлов и удаления объектов в облачном хранилище. Ниже приведен пример, в котором перечислены все файлы в сегменте, а затем считан один из этих файлов в строку.

// Set the default pipeline options so the various filesystems are
// loaded into the registry. This shouldn't be necessary if used
// within a pipeline.
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());

// List Bucket
MatchResult listResult = FileSystems.match("gs://filesystems-demo/**/*");
listResult
    .metadata()
    .forEach(
        metadata -> {
          ResourceId resourceId = metadata.resourceId();
          System.out.println(resourceId.toString());
        });


// Read file
ResourceId existingFileResourceId = FileSystems
    .matchSingleFileSpec("gs://filesystems-demo/test-file1.csv")
    .resourceId();

try (ByteArrayOutputStream out = new ByteArrayOutputStream();
    ReadableByteChannel readerChannel = FileSystems.open(existingFileResourceId);
    WritableByteChannel writerChannel = Channels.newChannel(out)) {
  ByteStreams.copy(readerChannel, writerChannel);

  System.out.println("File contents: \n" + out.toString());
}


// Write file
String contentToWrite = "Laces out Dan!";

ResourceId newFileResourceId = FileSystems
    .matchNewResource("gs://filesystems-demo/new-file.txt", false);

try (ByteArrayInputStream in = new ByteArrayInputStream(contentToWrite.getBytes());
    ReadableByteChannel readerChannel = Channels.newChannel(in);
    WritableByteChannel writerChannel = FileSystems.create(newFileResourceId, MimeTypes.TEXT)) {

  ByteStreams.copy(readerChannel, writerChannel);
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...