java.lang.IllegalArgumentException: вызов sideInput () с неизвестным представлением - PullRequest
0 голосов
/ 14 ноября 2018

Я пытался переместить данные из одной таблицы в другую.Использовал SideInput для фильтрации записей при преобразовании данных.SideInput также тип коллекции KV и загружает данные из другой таблицы.

При запуске мой конвейер получил ошибку «java.lang.IllegalArgumentException: вызов функции sideInput () с неизвестным представлением».

Вот весь код, который я пробовал:

{
PipelineOptionsFactory.register(OptionPipeline.class);

OptionPipeline options = PipelineOptionsFactory.fromArgs(args).withValidation().as(OptionPipeline.class);

Pipeline p = Pipeline.create(options);

PCollection<TableRow> sideInputData = p.apply("ReadSideInput",BigQueryIO.readTableRows().from(options.getOrgRegionMapping()));
PCollection<KV<String,String>> sideInputMap = sideInputData.apply(ParDo.of(new getSideInputDataFn()));
final PCollectionView<Map<String,String>> sideInputView = sideInputMap.apply(View.<String,String>asMap());



PCollection<TableRow> orgMaster = p.apply("ReadOrganization",BigQueryIO.readTableRows().from(options.getOrgCodeMaster()));
PCollection<TableRow> orgCode = orgMaster.apply(ParDo.of(new gnGetOrgMaster()));


@SuppressWarnings("serial")
PCollection<TableRow> finalResultCollection =  orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>() 
{
      @ProcessElement
      public void processElement(ProcessContext c) {

          TableRow outputRow = new TableRow();

          TableRow orgCodeRow = c.element();
          String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

          String region = c.sideInput(sideInputView).get(orgCodefromMaster);

          outputRow.set("orgCode", orgCodefromMaster);
          outputRow.set("orgName", orgCodeRow.get("orgName"));
          outputRow.set("orgName", region);
          DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
          Date dateobj = new Date();
          outputRow.set("updatedDate",df.format(dateobj));

          c.output(outputRow);
      }
}));


finalResultCollection.apply(BigQueryIO.writeTableRows()
                     .withSchema(schema)
                     .to(options.getOrgCodeTable())
                     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

p.run().waitUntilFinish();
}
@SuppressWarnings("serial")
static class getSideInputDataFn extends DoFn<TableRow,KV<String, String>>
{
    @ProcessElement
    public void processElement(ProcessContext c)
    {
        TableRow row = c.element();
        c.output(KV.of((String) row.get("orgcode"), (String) row.get("region")));
    }
}

1 Ответ

0 голосов
/ 06 декабря 2018

Похоже, что бегун жалуется, потому что вы никогда не говорили ему о боковом вводе при определении графика.В этом случае вы звоните .withSideInputs после вызова ParDo.of, передающего ссылку на PCollectionView<T>, который вы определили ранее.

@SuppressWarnings("serial")
PCollection<TableRow> finalResultCollection =  orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
{
    @ProcessElement
    public void processElement(ProcessContext c) {

        TableRow outputRow = new TableRow();

        TableRow orgCodeRow = c.element();
        String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

        String region = c.sideInput(sideInputView).get(orgCodefromMaster);

        outputRow.set("orgCode", orgCodefromMaster);
        outputRow.set("orgName", orgCodeRow.get("orgName"));
        outputRow.set("orgName", region);
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
        Date dateobj = new Date();
        outputRow.set("updatedDate",df.format(dateobj));

        c.output(outputRow);
    }
}).withSideInputs(sideInputView));

Я не тестировал этот код, но это то, что выделяется, когда япосмотри на это.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...