Невозможно записать в хранилище Google, используя облачный поток данных - PullRequest
0 голосов
/ 25 сентября 2019

В основном я читаю данные из pubsub и записываю данные в хранилище Google.Фрагмент кода, как показано ниже.

public class WriteWindowedFile extends PTransform<PCollection<String>, PDone> {

    private String bucketLocation;

    private LogTypeEnum logTypeEnum;

    private int shards;

    public WriteWindowedFile(String bucketLocation, LogTypeEnum logTypeEnum, int shards) {
        this.bucketLocation = bucketLocation;
        this.logTypeEnum = logTypeEnum;
        this.shards = shards;
    }

    @Override
    public PDone expand(PCollection<String> input) {
        checkArgument(input.getWindowingStrategy().getWindowFn().windowCoder() == IntervalWindow.getCoder());

        ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(bucketLocation);

        return input.apply(
                TextIO.write()
                .to(new FileStorageFileNamePolicy(logTypeEnum))
                .withTempDirectory(resource.getCurrentDirectory())
                .withWindowedWrites()
                .withNumShards(shards)
        );
    }
}

Реализация FilenamePolicy:

public class FileStorageFileNamePolicy extends FileBasedSink.FilenamePolicy {

    private static final long serialVersionUID = 1L;

    private static final Logger LOGGER = LoggerFactory.getLogger(FileStorageFileNamePolicy.class);

    private LogTypeEnum logTypeEnum;

    public FileStorageFileNamePolicy(LogTypeEnum logTypeEnum) {
        this.logTypeEnum = logTypeEnum;
    }

    @Override
    public ResourceId windowedFilename(int shardNumber,
                                       int numShards,
                                       BoundedWindow window,
                                       PaneInfo paneInfo,
                                       FileBasedSink.OutputFileHints outputFileHints) {
        IntervalWindow intervalWindow = (IntervalWindow) window;
        String startDate = intervalWindow.start().toString();
        String dateString = startDate.replace("T", CommonConstants.SPACE)
                .replaceAll(startDate.substring(startDate.indexOf('Z')), CommonConstants.EMPTY_STRING);
        try {
            startDate = DateUtil.getDateForFileStore(dateString, null);
        } catch (ParseException e) {
            LOGGER.error("Error converting date  : {}", e);
        }
        String filename = intervalWindow.start().toString() + ".txt";
        String dirName = startDate + CommonConstants.FORWARD_SLASH +
                logTypeEnum.getValue().toLowerCase() + CommonConstants.FORWARD_SLASH;
        LOGGER.info("Directory : {} and File Name : {}", dirName, filename);
        return FileBasedSink.convertToFileResourceIfPossible(filename).
                resolve(dirName, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
    }

    @Nullable
    @Override
    public ResourceId unwindowedFilename(
            int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
        throw new UnsupportedOperationException("Unsupported");
    }
}

При записи в хранилище Google возникает следующая проблема, даже когда я передаю фактический путь к каталогу.Получение ниже трассировки стека при попытке разрешить каталог в FileStorageFileNamePolicy классе.

исключение: "java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: ожидается, что путь является каталогом, но имел [/2019-09-23T16:59:42.189Z.txt]. At org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn $ 1.output (GroupAlsoByWindowsParDoFn.java:18..apache.beam.runners.dataflow.worker..runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement (GroupAlsoByWindowFnRunner.java:73) по адресу org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement (GroupAlsoByWindowsParDoFn.java:r.worker.ParDoOperation.process (ParDoOperation.java:44) в org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process (OutputReceiver.java:49) в org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop (ReadOperation.java:201) в org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start (ReadOperation.java:159) вorg.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute (MapTaskExecutor.java:77) в org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process (StreamingDataflowWorker)) в org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access $ 1000 (StreamingDataflowWorker.java:149) в org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker $ 6.run (StreamingDataflowWorker.java:1028) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor $ Worker.run .17lang.Thread.run (Thread.java:745) Причина: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: ожидается, что путь является каталогом, но имел [/ 2019-09-23T16:59: 42.189Z.txt].в org.apache.beam.sdk.util.UserCodeException.wrap (UserCodeException.java:34) в org.apache.beam.sdk.io.WriteFiles $ FinalizeTempFileBundles $ FinalizeFn $ DoFnInvoker.invokeProcessElement (неизвестный источник) в org.ap.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement (SimpleDoFnRunner.java:214) в org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement (SimpleDoFnRunner.java:179) в org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement (SimpleParDoFn.java:330) в org.apache.beam.ata.util.common.worker.ParDoOperation.process (ParDoOperation.java:44) в org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process (OutputReceiver.java:49) в org.apache.beam.runners.dataflow.worker.SimpleParDoFn $ 1.output (SimpleParDoFn.java:276) в org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:248) вorg.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access $ 700 (SimpleDoFnRunner.java:74) в org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.beam.sdk.transforms.MapElements $ 1.processElement (MapElements.java:139) в org.apache.beam.sdk.transforms.MapElements $ 1 $ DoFnInvoker.invokeProcessElement (неизвестный источник) в org.apache.beam.runners.dataflow..repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement (SimpleDoFnRunner.java:214) в org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.Rimple .DoFprocessElement (SimpleDoFnRunner.java:179) в org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement (SimpleParDoFn.java:330) в org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process (ParDoOperation.java:44) в org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process (OutputReceiver.java:49) в org.apache.beam.runners.dataflow.worker.SimpleParDoFn $ 1.output (SimpleParDoFn.java:276) в org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:248) в org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access $ 700 (SimpleDoFnRunner.jap at..beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner $ DoFnProcessContext.output (SimpleDoFnRunner.java:560) в org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner $ DoFnProcessContext.output (SimpleDoFnRunner.java:548) в org.apache.beam.runners.dataflow.ReshuffleOverrideFactory $ ReshuffleWithOnlyTrigger $ 1.processE.java: 86) at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory $ ReshuffleWithOnlyTrigger $ 1 $ DoFnInvoker.invokeProcessElement (Неизвестный источник) в org.apache.beam.runners.dataflow.worker.apunam.org.core.SimpleDoFnRunner.invokeProcessElement (SimpleDoFnRunner.java:214) в org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement: org в SimpleDoF (простой).apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement (SimpleParDoFn.java:330) в org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process (ParDoOperation.java:44)в org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process (OutputReceiver.java:49) в org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn $ 1.output (GroupAlsoFWindows): 182) ... еще 17 Причины: java.lang.IllegalStateException: Ожидаемый путь является каталогом, но имел [/2019-09-23T16:59:42.189Z.txt].в org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState (Preconditions.java:588) в org.apache.beam.sdk.io.LocalResourceId.resolve (LocalResourceId.java:57) в org.apache.beam.sdk.io.LocalResourceId.resolve (LocalResourceId.java:36) в com.vuclip.dataflow.pipeline.helper.FileStorageFileNamePolicy.windowedFilename (FileStorageFileNamePolicy.japap):.sdk.io.FileBasedSink $ FileResult.getDestinationFile (FileBasedSink.java:1086) в org.apache.beam.sdk.io.FileBasedSink $ WriteOperation.finalizeDestination (FileBasedSink.java:645) в org.apache.beam.sdkio.WriteFiles.finalizeAllDestination (WriteFiles.java:872) по адресу org.apache.beam.sdk.io.WriteFiles.access $ 1600 (WriteFiles.java:111) по адресуorg.apache.beam.sdk.io.WriteFiles $ FinalizeTempFileBundles $ FinalizeFn.process (WriteFiles.java:849)

Может кто-нибудь помочь, пожалуйста?Спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...