Я новичок в DataFlow и пытаюсь настроить потоковый конвейер чтения файлов CSV из Google Cloud Storage в BigQuery. Конвейер успешно создан, файлы CSV читаются и анализируются. Однако весь конвейер не инициализирован должным образом. Поэтому в BigQuery данные не загружаются.
Я использую Java 8 и Apache Beam 2.5.0.
Когда я просматриваю график выполнения DataFlow, я вижу, что блок Write to bigquery/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/ParDo(StreamingPCollectionViewWriter)
получает входные данные, но никогда не выплевывает какие-либо выходные данные. Из-за этого следующий шаг Write to bigquery/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/CreateDataflowView
никогда не выполняется.
Я "вдохновлен" кодом, например
https://github.com/asaharland/beam-pipeline-examples/blob/master/src/main/java/com/harland/example/streaming/StreamingFilePipeline.java
public class MyStreamPipeline {
private static final Logger LOG = LoggerFactory.getLogger(MyStreamPipeline.class);
private static final int WINDOW_SIZE_SECONDS = 120;
public interface MyOptions extends PipelineOptions, GcpOptions {
@Description("BigQuery Table Spec project_id:dataset_id.table_id")
ValueProvider<String> getBigQueryTableSpec();
void setBigQueryTableSpec(ValueProvider<String> value);
@Description("Google Cloud Storage Bucket Name")
ValueProvider<String> getBucketUrl();
void setBucketUrl(ValueProvider<String> value);
}
public static void main(String[] args) throws IOException {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline pipeline = Pipeline.create(options);
List<TableFieldSchema> tableFields = new ArrayList<>();
tableFields.add(new TableFieldSchema().setName("FIELD_NAME").setType("INTEGER"));
// ... more fields here ...
TableSchema schema = new TableSchema().setFields(tableFields);
pipeline
.apply("Read CSV as string from Google Cloud Storage",
TextIO
.read()
.from(options.getBucketUrl() + "/**")
.watchForNewFiles(
// Check for new files every 1 minute(s)
Duration.standardMinutes(1),
// Never stop checking for new files
Watch.Growth.never())
)
.apply(String.format("Window Into %d Second Windows", WINDOW_SIZE_SECONDS),
Window.into(FixedWindows.of(Duration.standardSeconds(WINDOW_SIZE_SECONDS))))
.apply("Convert CSV string to Record",
ParDo.of(new CsvToRecordFn()))
.apply("Record to TableRow",
ParDo.of(new DoFn<Record, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
Record record = c.element();
TableRow tr = record.getTableRow();
c.output(tr);
return;}}))
.apply("Write to bigquery",
BigQueryIO
.writeTableRows()
.to(options.getBigQueryTableSpec())
.withSchema(schema)
.withTimePartitioning(new TimePartitioning().setField("PARTITION_FIELD_NAME").setType("DAY"))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
pipeline.run();
}
}
Я выполняю конвейер с помощью команды Maven, подобной этой:
mvn -f pom_2-5-0.xml clean compile exec:java \
-Dexec.mainClass=com.organization.processor.gcs.myproject.MyStreamPipeline \
-Dexec.args=" \
--project=$PROJECT_ID \
--stagingLocation=gs://$PROJECT_ID-processor/$VERSION/staging \
--tempLocation=gs://$PROJECT_ID-processor/$VERSION/temp/ \
--gcpTempLocation=gs://$PROJECT_ID-processor/$VERSION/gcptemp/ \
--runner=DataflowRunner \
--zone=$DF_ZONE \
--region=$DF_REGION \
--numWorkers=$DF_NUM_WORKERS \
--maxNumWorkers=$DF_MAX_NUM_WORKERS \
--diskSizeGb=$DF_DISK_SIZE_GB \
--workerMachineType=$DF_WORKER_MACHINE_TYPE \
--bucketUrl=$GCS_BUCKET_URL \
--bigQueryTableSpec=$PROJECT_ID:$BQ_TABLE_SPEC \
--streaming"
Я действительно не понимаю, почему конвейер не инициализирован должным образом и почему данные не загружаются в BigQuery.
Любая помощь приветствуется!