BeamSQL - выходной файл пуст - PullRequest
0 голосов
/ 25 апреля 2019

При использовании BeamSQL в конвейере потока данных (javacode), после применения SQLTransform.query затем выводится нулевое значение. В приведенном ниже примере кода мы пытаемся прочитать данные из облачного хранилища и применить фильтр с помощью BeamSQL, а затем отправить файл в облачное хранилище. На выходе из облачного хранилища файл пуст.

public static final String HEADER = "artist_credit,position,artist,name,join_phrase";  public static final Schema appSchema =       Schema      .builder()      .addStringField("artist_credit")        .addStringField("position")         .addStringField("artist")       .addStringField("name")         .addStringField("join_phrase")      .build(); public static void main(String[] args) throws FileNotFoundException, IOException {

        PipelineOptionsFactory.register(DefaultOptions.class);  DefaultOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DefaultOptions.class);   options.setRunner(DataflowRunner.class);    options.setProject(options.getDataFlowProjectName());   options.setTempLocation(options.getDataFlowProjectTempLocation());  options.setGcpTempLocation(options.getDataFlowProjectTempLocation());   Pipeline p = Pipeline.create(options);

p.apply("read_from_gcs",  TextIO.read().from ("gs://bq_dataflow_testing/files/artist_credit_name.json"))
        .apply("transform_to_row", ParDo.of(new RowParDo())).setRowSchema(appSchema)
        .apply("transform_sql", SqlTransform.query("select `artist_credit`, `position`,`artist`,`name`,`join_phrase` from PCOLLECTION where `position` = '1' "))
        .apply("transform_to_string", ParDo.of(new RowToString()))
        .apply("write_to_gcs", TextIO.write().to("gs://bq_dataflow_testing/file/artist_credit_name.json").withoutSharding());
         p.run().waitUntilFinish();  } 

public static class RowParDo extends DoFn<String, Row> {     @ProcessElement     public void processElement(ProcessContext c) {          if (!c.element().equalsIgnoreCase(HEADER))  {           String[] vals = c.element().split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)");            Row appRow
    = Row.withSchema(appSchema)
                        .addValue(vals[0])
                        .addValue(vals[1])
                        .addValue(vals[2])
                        .addValue(vals[3])
                        .addValue(vals[4])
                        .build();           c.output(appRow);       }   } }

    public static class RowToString extends DoFn<Row, String> {     @ProcessElement      public void processElement(ProcessContext c) {          Row e = c.element();        String line = e.getValues()
                    .stream()
                    .map(Object::toString)
                    .collect(Collectors.joining(","));      c.output(line);     } }

Было бы хорошо, если бы кто-нибудь помог мне решить эту проблему.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...