Динамически определять местоположение файла для Google Dataflow - PullRequest
0 голосов
/ 16 декабря 2018

Я использую значение провайдера, чтобы передать дату для моего потока данных

    @Description("Current Date in America/Los_Angeles timezone")
    ValueProvider<String> getLocalDate();
    void setLocalDate(ValueProvider<String> date);

И я хочу запросить все папки в корзине с переданной датой и прочитать ее содержимое.Для этого я использую пользовательский valueProvider

CustomValueProvider valueProvider = new CustomValueProvider(options);

    p.apply(TextIO.read().from(customValueProvider.get()))

    Storage storageOptions = StorageOptions.newBuilder().setProjectId(options.getProjectId()).build().getService();
    BlobListOption listOptions = BlobListOption.currentDirectory();
    Page<Blob> bucketItems = storageOptions.list(options.getCloudStorageBucket(), listOptions);
    for (Blob item : bucketItems.iterateAll()) {
      if (item.isDirectory() && item.getName().contains(localDate)) {
        directoryList.add(item.getName());
      }
    }

И возвращаю первое значение из списка.

Но я получаю

`java.lang.NullPointerException
    at org.apache.beam.sdk.io.LocalFileSystem.matchOne(LocalFileSystem.java:223)
    at org.apache.beam.sdk.io.LocalFileSystem.match(LocalFileSystem.java:90)
    at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:119)
    at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:140)
    at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:152)
    at org.apache.beam.sdk.io.FileBasedSource.split(FileBasedSource.java:262)
    at com.google.cloud.dataflow.worker.WorkerCustomSources.splitAndValidate(WorkerCustomSources.java:275)
    at com.google.cloud.dataflow.worker.WorkerCustomSources.performSplitTyped(WorkerCustomSources.java:197)
    at com.google.cloud.dataflow.worker.WorkerCustomSources.performSplitWithApiLimit(WorkerCustomSources.java:181)
    at com.google.cloud.dataflow.worker.WorkerCustomSources.performSplit(WorkerCustomSources.java:160)
    at com.google.cloud.dataflow.worker.WorkerCustomSourceOperationExecutor.execute(WorkerCustomSourceOperationExecutor.java:77)
    at com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:393)
    at com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:362)
    at com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:290)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:134)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:114)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:101)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)`
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...