При использовании BeamSQL в конвейере потока данных (javacode), после применения SQLTransform.query затем выводится нулевое значение. В приведенном ниже примере кода мы пытаемся прочитать данные из облачного хранилища и применить фильтр с помощью BeamSQL, а затем отправить файл в облачное хранилище. На выходе из облачного хранилища файл пуст.
public static final String HEADER = "artist_credit,position,artist,name,join_phrase"; public static final Schema appSchema = Schema .builder() .addStringField("artist_credit") .addStringField("position") .addStringField("artist") .addStringField("name") .addStringField("join_phrase") .build(); public static void main(String[] args) throws FileNotFoundException, IOException {
PipelineOptionsFactory.register(DefaultOptions.class); DefaultOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DefaultOptions.class); options.setRunner(DataflowRunner.class); options.setProject(options.getDataFlowProjectName()); options.setTempLocation(options.getDataFlowProjectTempLocation()); options.setGcpTempLocation(options.getDataFlowProjectTempLocation()); Pipeline p = Pipeline.create(options);
p.apply("read_from_gcs", TextIO.read().from ("gs://bq_dataflow_testing/files/artist_credit_name.json"))
.apply("transform_to_row", ParDo.of(new RowParDo())).setRowSchema(appSchema)
.apply("transform_sql", SqlTransform.query("select `artist_credit`, `position`,`artist`,`name`,`join_phrase` from PCOLLECTION where `position` = '1' "))
.apply("transform_to_string", ParDo.of(new RowToString()))
.apply("write_to_gcs", TextIO.write().to("gs://bq_dataflow_testing/file/artist_credit_name.json").withoutSharding());
p.run().waitUntilFinish(); }
public static class RowParDo extends DoFn<String, Row> { @ProcessElement public void processElement(ProcessContext c) { if (!c.element().equalsIgnoreCase(HEADER)) { String[] vals = c.element().split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"); Row appRow
= Row.withSchema(appSchema)
.addValue(vals[0])
.addValue(vals[1])
.addValue(vals[2])
.addValue(vals[3])
.addValue(vals[4])
.build(); c.output(appRow); } } }
public static class RowToString extends DoFn<Row, String> { @ProcessElement public void processElement(ProcessContext c) { Row e = c.element(); String line = e.getValues()
.stream()
.map(Object::toString)
.collect(Collectors.joining(",")); c.output(line); } }
Было бы хорошо, если бы кто-нибудь помог мне решить эту проблему.