Конвейер DataFlow застрял при инициализации tempLocation? - PullRequest
0 голосов
/ 18 января 2019

Я новичок в DataFlow и пытаюсь настроить потоковый конвейер чтения файлов CSV из Google Cloud Storage в BigQuery. Конвейер успешно создан, файлы CSV читаются и анализируются. Однако весь конвейер не инициализирован должным образом. Поэтому в BigQuery данные не загружаются.

Я использую Java 8 и Apache Beam 2.5.0.

Когда я просматриваю график выполнения DataFlow, я вижу, что блок Write to bigquery/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/ParDo(StreamingPCollectionViewWriter) получает входные данные, но никогда не выплевывает какие-либо выходные данные. Из-за этого следующий шаг Write to bigquery/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/CreateDataflowView никогда не выполняется.

Я "вдохновлен" кодом, например https://github.com/asaharland/beam-pipeline-examples/blob/master/src/main/java/com/harland/example/streaming/StreamingFilePipeline.java

public class MyStreamPipeline {

    private static final Logger LOG = LoggerFactory.getLogger(MyStreamPipeline.class);

    private static final int WINDOW_SIZE_SECONDS = 120;

    public interface MyOptions extends PipelineOptions, GcpOptions {
        @Description("BigQuery Table Spec project_id:dataset_id.table_id")
        ValueProvider<String> getBigQueryTableSpec();
        void setBigQueryTableSpec(ValueProvider<String> value);

        @Description("Google Cloud Storage Bucket Name")
        ValueProvider<String> getBucketUrl();
        void setBucketUrl(ValueProvider<String> value);
    }


    public static void main(String[] args) throws IOException {
        MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
        Pipeline pipeline = Pipeline.create(options);

        List<TableFieldSchema> tableFields = new ArrayList<>();
        tableFields.add(new TableFieldSchema().setName("FIELD_NAME").setType("INTEGER"));
        // ... more fields here ...
        TableSchema schema = new TableSchema().setFields(tableFields);

        pipeline
        .apply("Read CSV as string from Google Cloud Storage", 
            TextIO
                .read()
                .from(options.getBucketUrl() + "/**")
                .watchForNewFiles(
                    // Check for new files every 1 minute(s)
                    Duration.standardMinutes(1),
                    // Never stop checking for new files
                    Watch.Growth.never())
                )
        .apply(String.format("Window Into %d Second Windows", WINDOW_SIZE_SECONDS),
            Window.into(FixedWindows.of(Duration.standardSeconds(WINDOW_SIZE_SECONDS))))
        .apply("Convert CSV string to Record", 
            ParDo.of(new CsvToRecordFn()))
        .apply("Record to TableRow",
            ParDo.of(new DoFn<Record, TableRow>() {
                @ProcessElement
                public void processElement(ProcessContext c)  {
                    Record record = c.element();
                    TableRow tr = record.getTableRow();
                    c.output(tr);
                    return;}}))
        .apply("Write to bigquery", 
            BigQueryIO
                .writeTableRows()
                .to(options.getBigQueryTableSpec())
                .withSchema(schema)
                .withTimePartitioning(new TimePartitioning().setField("PARTITION_FIELD_NAME").setType("DAY"))
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

        pipeline.run();
    }
}

Я выполняю конвейер с помощью команды Maven, подобной этой:

mvn -f pom_2-5-0.xml clean compile exec:java \
      -Dexec.mainClass=com.organization.processor.gcs.myproject.MyStreamPipeline \
      -Dexec.args=" \
      --project=$PROJECT_ID \
      --stagingLocation=gs://$PROJECT_ID-processor/$VERSION/staging \
      --tempLocation=gs://$PROJECT_ID-processor/$VERSION/temp/ \
      --gcpTempLocation=gs://$PROJECT_ID-processor/$VERSION/gcptemp/ \
      --runner=DataflowRunner \
      --zone=$DF_ZONE \
      --region=$DF_REGION \
      --numWorkers=$DF_NUM_WORKERS \
      --maxNumWorkers=$DF_MAX_NUM_WORKERS \
      --diskSizeGb=$DF_DISK_SIZE_GB \
      --workerMachineType=$DF_WORKER_MACHINE_TYPE \
      --bucketUrl=$GCS_BUCKET_URL \
      --bigQueryTableSpec=$PROJECT_ID:$BQ_TABLE_SPEC \
      --streaming"

Я действительно не понимаю, почему конвейер не инициализирован должным образом и почему данные не загружаются в BigQuery.

Любая помощь приветствуется!

...